diff options
Diffstat (limited to 'kamon-akka/src/main')
11 files changed, 984 insertions, 0 deletions
diff --git a/kamon-akka/src/main/resources/META-INF/aop.xml b/kamon-akka/src/main/resources/META-INF/aop.xml new file mode 100644 index 00000000..46e63f91 --- /dev/null +++ b/kamon-akka/src/main/resources/META-INF/aop.xml @@ -0,0 +1,34 @@ +<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> + +<aspectj> + <aspects> + + <!-- Actors --> + <aspect name="akka.kamon.instrumentation.TraceContextIntoRepointableActorRefMixin"/> + <aspect name="akka.kamon.instrumentation.TraceContextIntoSystemMessageMixin"/> + <aspect name="akka.kamon.instrumentation.ActorSystemMessageInstrumentation"/> + <aspect name="akka.kamon.instrumentation.TraceContextIntoEnvelopeMixin"/> + <aspect name="akka.kamon.instrumentation.MetricsIntoActorCellsMixin"/> + <aspect name="akka.kamon.instrumentation.ActorCellInstrumentation"/> + <aspect name="akka.kamon.instrumentation.RoutedActorCellInstrumentation"/> + <aspect name="akka.kamon.instrumentation.ActorLoggingInstrumentation"/> + + <!-- Dispatchers --> + <aspect name="akka.kamon.instrumentation.DispatcherInstrumentation"/> + <aspect name="akka.kamon.instrumentation.DispatcherMetricCollectionInfoIntoDispatcherMixin"/> + + <!-- Patterns --> + <aspect name="akka.kamon.instrumentation.AskPatternInstrumentation"/> + </aspects> + + <weaver> + <include within="akka..*"/> + + <!-- For some weird reason ByteString produces a java.lang.VerifyError after going through the weaver. --> + <exclude within="akka.util.ByteString"/> + + <!-- Exclude CallingThreadDispatcher, is only for test purposes --> + <exclude within="akka.testkit.CallingThreadDispatcher"/> + </weaver> + +</aspectj>
\ No newline at end of file diff --git a/kamon-akka/src/main/resources/reference.conf b/kamon-akka/src/main/resources/reference.conf new file mode 100644 index 00000000..4f742ee6 --- /dev/null +++ b/kamon-akka/src/main/resources/reference.conf @@ -0,0 +1,51 @@ +# ================================== # +# Kamon-Akka Reference Configuration # +# ================================== # + +kamon { + akka { + # If ask-pattern-timeout-warning is enabled, a WARN level log message will be generated if a future generated by the `ask` + # pattern fails with a `AskTimeoutException` and the log message will contain information depending of the strategy selected. + # strategies: + # - off: nothing to do. + # - lightweight: logs the warning when a timeout is reached using org.aspectj.lang.reflect.SourceLocation. + # - heavyweight: logs the warning when a timeout is reached using a stack trace captured at the moment the future was created. + ask-pattern-timeout-warning = off + + # Default dispatcher for all akka module operations + dispatcher = ${kamon.default-dispatcher} + } + + metrics.precision { + actor { + processing-time = ${kamon.metrics.precision.default-histogram-precision} + time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} + mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision} + } + + router { + routing-time = ${kamon.metrics.precision.default-histogram-precision} + processing-time = ${kamon.metrics.precision.default-histogram-precision} + time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} + } + + dispatcher { + maximum-pool-size { + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + running-thread-count { + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + queued-task-count { + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + pool-size { + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + } + } +}
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala new file mode 100644 index 00000000..b22f7fa9 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala @@ -0,0 +1,94 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.akka + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric._ +import kamon.metric.instrument.{ Counter, Histogram, MinMaxCounter } + +case class ActorMetrics(name: String) extends MetricGroupIdentity { + val category = ActorMetrics +} + +object ActorMetrics extends MetricGroupCategory { + val name = "actor" + + case object ProcessingTime extends MetricIdentity { val name = "processing-time" } + case object MailboxSize extends MetricIdentity { val name = "mailbox-size" } + case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } + case object Errors extends MetricIdentity { val name = "errors" } + + case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter, + errors: Counter) extends MetricGroupRecorder { + + def collect(context: CollectionContext): ActorMetricSnapshot = + ActorMetricSnapshot( + processingTime.collect(context), + timeInMailbox.collect(context), + mailboxSize.collect(context), + errors.collect(context)) + + def cleanup: Unit = { + processingTime.cleanup + mailboxSize.cleanup + timeInMailbox.cleanup + errors.cleanup + } + } + + case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, + mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = ActorMetricSnapshot + + def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot = + ActorMetricSnapshot( + processingTime.merge(that.processingTime, context), + timeInMailbox.merge(that.timeInMailbox, context), + mailboxSize.merge(that.mailboxSize, context), + errors.merge(that.errors, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + (ProcessingTime -> processingTime), + (MailboxSize -> mailboxSize), + (TimeInMailbox -> timeInMailbox), + (Errors -> errors)) + } + + val Factory = ActorMetricGroupFactory +} + +case object ActorMetricGroupFactory extends MetricGroupFactory { + import kamon.akka.ActorMetrics._ + + type GroupRecorder = ActorMetricsRecorder + + def create(config: Config, system: ActorSystem): ActorMetricsRecorder = { + val settings = config.getConfig("precision.actor") + + val processingTimeConfig = settings.getConfig("processing-time") + val timeInMailboxConfig = settings.getConfig("time-in-mailbox") + val mailboxSizeConfig = settings.getConfig("mailbox-size") + + new ActorMetricsRecorder( + Histogram.fromConfig(processingTimeConfig), + Histogram.fromConfig(timeInMailboxConfig), + MinMaxCounter.fromConfig(mailboxSizeConfig, system), + Counter()) + } +} diff --git a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala new file mode 100644 index 00000000..bc013b63 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala @@ -0,0 +1,32 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.akka + +import akka.actor +import akka.actor._ +import kamon._ + +class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val config = system.settings.config.getConfig("kamon.akka") + val askPatternTimeoutWarning = config.getString("ask-pattern-timeout-warning") + val dispatcher = system.dispatchers.lookup(config.getString("dispatcher")) +} + +object Akka extends ExtensionId[AkkaExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: actor.Extension] = Akka + def createExtension(system: ExtendedActorSystem): AkkaExtension = new AkkaExtension(system) +}
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala new file mode 100644 index 00000000..64e16f96 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala @@ -0,0 +1,94 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.akka + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric._ +import kamon.metric.instrument.Histogram + +case class DispatcherMetrics(name: String) extends MetricGroupIdentity { + val category = DispatcherMetrics +} + +object DispatcherMetrics extends MetricGroupCategory { + val name = "dispatcher" + + case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" } + case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" } + case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" } + case object PoolSize extends MetricIdentity { val name = "pool-size" } + + case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram, + queueTaskCount: Histogram, poolSize: Histogram) + extends MetricGroupRecorder { + + def collect(context: CollectionContext): MetricGroupSnapshot = + DispatcherMetricSnapshot( + maximumPoolSize.collect(context), + runningThreadCount.collect(context), + queueTaskCount.collect(context), + poolSize.collect(context)) + + def cleanup: Unit = {} + + } + + case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot, + queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = DispatcherMetricSnapshot + + def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot = + DispatcherMetricSnapshot( + maximumPoolSize.merge(that.maximumPoolSize, context), + runningThreadCount.merge(that.runningThreadCount, context), + queueTaskCount.merge(that.queueTaskCount, context), + poolSize.merge(that.poolSize, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + (MaximumPoolSize -> maximumPoolSize), + (RunningThreadCount -> runningThreadCount), + (QueueTaskCount -> queueTaskCount), + (PoolSize -> poolSize)) + } + + val Factory = DispatcherMetricGroupFactory +} + +case object DispatcherMetricGroupFactory extends MetricGroupFactory { + + import kamon.akka.DispatcherMetrics._ + + type GroupRecorder = DispatcherMetricRecorder + + def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = { + val settings = config.getConfig("precision.dispatcher") + + val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size") + val runningThreadCountConfig = settings.getConfig("running-thread-count") + val queueTaskCountConfig = settings.getConfig("queued-task-count") + val poolSizeConfig = settings.getConfig("pool-size") + + new DispatcherMetricRecorder( + Histogram.fromConfig(maximumPoolSizeConfig), + Histogram.fromConfig(runningThreadCountConfig), + Histogram.fromConfig(queueTaskCountConfig), + Histogram.fromConfig(poolSizeConfig)) + } + +} diff --git a/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala new file mode 100644 index 00000000..2eedf764 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala @@ -0,0 +1,89 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package kamon.akka + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric._ +import kamon.metric.instrument.{ Counter, Histogram } + +case class RouterMetrics(name: String) extends MetricGroupIdentity { + val category = RouterMetrics +} + +object RouterMetrics extends MetricGroupCategory { + val name = "router" + + case object RoutingTime extends MetricIdentity { val name = "routing-time" } + case object ProcessingTime extends MetricIdentity { val name = "processing-time" } + case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } + case object Errors extends MetricIdentity { val name = "errors" } + + case class RouterMetricsRecorder(routingTime: Histogram, processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder { + + def collect(context: CollectionContext): RouterMetricSnapshot = + RouterMetricSnapshot(routingTime.collect(context), processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context)) + + def cleanup: Unit = { + routingTime.cleanup + processingTime.cleanup + timeInMailbox.cleanup + errors.cleanup + } + } + + case class RouterMetricSnapshot(routingTime: Histogram.Snapshot, processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = RouterMetricSnapshot + + def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot = + RouterMetricSnapshot( + routingTime.merge(that.routingTime, context), + processingTime.merge(that.processingTime, context), + timeInMailbox.merge(that.timeInMailbox, context), + errors.merge(that.errors, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + RoutingTime -> routingTime, + ProcessingTime -> processingTime, + TimeInMailbox -> timeInMailbox, + Errors -> errors) + } + + val Factory = RouterMetricGroupFactory +} + +case object RouterMetricGroupFactory extends MetricGroupFactory { + + import kamon.akka.RouterMetrics._ + + type GroupRecorder = RouterMetricsRecorder + + def create(config: Config, system: ActorSystem): RouterMetricsRecorder = { + val settings = config.getConfig("precision.router") + + val routingTimeConfig = settings.getConfig("routing-time") + val processingTimeConfig = settings.getConfig("processing-time") + val timeInMailboxConfig = settings.getConfig("time-in-mailbox") + + new RouterMetricsRecorder( + Histogram.fromConfig(routingTimeConfig), + Histogram.fromConfig(processingTimeConfig), + Histogram.fromConfig(timeInMailboxConfig), + Counter()) + } +} + diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala new file mode 100644 index 00000000..78d88583 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala @@ -0,0 +1,213 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import akka.actor._ +import akka.dispatch.{ Envelope, MessageDispatcher } +import akka.routing.RoutedActorCell +import kamon.Kamon +import kamon.akka.{ RouterMetrics, ActorMetrics } +import ActorMetrics.ActorMetricsRecorder +import RouterMetrics.RouterMetricsRecorder +import kamon.metric.Metrics +import kamon.trace._ +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorCellInstrumentation { + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") + def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + val metricsExtension = Kamon(Metrics)(system) + val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + + cellMetrics.actorMetricIdentity = metricIdentity + cellMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) + } + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(envelope)") + def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} + + @Around("invokingActorBehaviourAtActorCell(cell, envelope)") + def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + val timestampBeforeProcessing = System.nanoTime() + val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] + + try { + TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { + pjp.proceed() + } + } finally { + cellMetrics.actorMetricsRecorder.map { am ⇒ + val processingTime = System.nanoTime() - timestampBeforeProcessing + val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime + + am.processingTime.record(processingTime) + am.timeInMailbox.record(timeInMailbox) + am.mailboxSize.decrement() + + // In case that this actor is behind a router, record the metrics for the router. + envelope.asInstanceOf[RouterAwareEnvelope].routerMetricsRecorder.map { rm ⇒ + rm.processingTime.record(processingTime) + rm.timeInMailbox.record(timeInMailbox) + } + } + } + } + + @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell) && args(envelope)") + def sendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = {} + + @After("sendMessageInActorCell(cell, envelope)") + def afterSendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = { + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + cellMetrics.actorMetricsRecorder.map(_.mailboxSize.increment()) + } + + @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") + def actorStop(cell: ActorCell): Unit = {} + + @After("actorStop(cell)") + def afterStop(cell: ActorCell): Unit = { + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + cellMetrics.actorMetricsRecorder.map { _ ⇒ + Kamon(Metrics)(cell.system).unregister(cellMetrics.actorMetricIdentity) + } + + // The Stop can't be captured from the RoutedActorCell so we need to put this piece of cleanup here. + if (cell.isInstanceOf[RoutedActorCell]) { + val routedCellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] + routedCellMetrics.routerMetricsRecorder.map { _ ⇒ + Kamon(Metrics)(cell.system).unregister(routedCellMetrics.routerMetricIdentity) + } + } + } + + @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") + def actorInvokeFailure(cell: ActorCell): Unit = {} + + @Before("actorInvokeFailure(cell)") + def beforeInvokeFailure(cell: ActorCell): Unit = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + cellWithMetrics.actorMetricsRecorder.map(_.errors.increment()) + + // In case that this actor is behind a router, count the errors for the router as well. + val envelope = cell.currentMessage.asInstanceOf[RouterAwareEnvelope] + envelope.routerMetricsRecorder.map(_.errors.increment()) + } +} + +@Aspect +class RoutedActorCellInstrumentation { + + @Pointcut("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, routeeProps, supervisor)") + def routedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = {} + + @After("routedActorCellCreation(cell, system, ref, props, dispatcher, routeeProps, supervisor)") + def afterRoutedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = { + val metricsExtension = Kamon(Metrics)(system) + val metricIdentity = RouterMetrics(ref.path.elements.mkString("/")) + val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] + + cellMetrics.routerMetricIdentity = metricIdentity + cellMetrics.routerMetricsRecorder = metricsExtension.register(metricIdentity, RouterMetrics.Factory) + } + + @Pointcut("execution(* akka.routing.RoutedActorCell.sendMessage(*)) && this(cell) && args(envelope)") + def sendMessageInRouterActorCell(cell: RoutedActorCell, envelope: Envelope) = {} + + @Around("sendMessageInRouterActorCell(cell, envelope)") + def aroundSendMessageInRouterActorCell(pjp: ProceedingJoinPoint, cell: RoutedActorCell, envelope: Envelope): Any = { + val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] + val timestampBeforeProcessing = System.nanoTime() + val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] + + try { + TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { + + // The router metrics recorder will only be picked up if the message is sent from a tracked router. + RouterAwareEnvelope.dynamicRouterMetricsRecorder.withValue(cellMetrics.routerMetricsRecorder) { + pjp.proceed() + } + } + } finally { + cellMetrics.routerMetricsRecorder map { routerRecorder ⇒ + routerRecorder.routingTime.record(System.nanoTime() - timestampBeforeProcessing) + } + } + } +} + +trait ActorCellMetrics { + var actorMetricIdentity: ActorMetrics = _ + var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ +} + +trait RoutedActorCellMetrics { + var routerMetricIdentity: RouterMetrics = _ + var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ +} + +trait RouterAwareEnvelope { + def routerMetricsRecorder: Option[RouterMetricsRecorder] +} + +object RouterAwareEnvelope { + import scala.util.DynamicVariable + private[kamon] val dynamicRouterMetricsRecorder = new DynamicVariable[Option[RouterMetricsRecorder]](None) + + def default: RouterAwareEnvelope = new RouterAwareEnvelope { + val routerMetricsRecorder: Option[RouterMetricsRecorder] = dynamicRouterMetricsRecorder.value + } +} + +@Aspect +class MetricsIntoActorCellsMixin { + + @DeclareMixin("akka.actor.ActorCell") + def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} + + @DeclareMixin("akka.routing.RoutedActorCell") + def mixinActorCellMetricsToRoutedActorCell: RoutedActorCellMetrics = new RoutedActorCellMetrics {} + +} + +@Aspect +class TraceContextIntoEnvelopeMixin { + + @DeclareMixin("akka.dispatch.Envelope") + def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default + + @DeclareMixin("akka.dispatch.Envelope") + def mixinRouterAwareToEnvelope: RouterAwareEnvelope = RouterAwareEnvelope.default + + @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") + def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TimestampedTraceContextAware with RouterAwareEnvelope): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + ctx.routerMetricsRecorder + } +}
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala new file mode 100644 index 00000000..e0e5d316 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala @@ -0,0 +1,50 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import kamon.trace.logging.MdcKeysSupport +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorLoggingInstrumentation extends MdcKeysSupport { + + @DeclareMixin("akka.event.Logging.LogEvent+") + def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.event.Logging.LogEvent+.new(..)) && this(event)") + def logEventCreation(event: TraceContextAware): Unit = {} + + @After("logEventCreation(event)") + def captureTraceContext(event: TraceContextAware): Unit = { + // Force initialization of TraceContextAware + event.traceContext + } + + @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") + def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {} + + @Around("withMdcInvocation(logSource, logEvent, logStatement)") + def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { + TraceRecorder.withInlineTraceContextReplacement(logEvent.traceContext) { + withMdc { + pjp.proceed() + } + } + } +} diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala new file mode 100644 index 00000000..48016876 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala @@ -0,0 +1,80 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import akka.dispatch.sysmsg.EarliestFirstSystemMessageList +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorSystemMessageInstrumentation { + + @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") + def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} + + @Around("systemMessageProcessing(messages)") + def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { + if (messages.nonEmpty) { + val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext + TraceRecorder.withInlineTraceContextReplacement(ctx)(pjp.proceed()) + + } else pjp.proceed() + } +} + +@Aspect +class TraceContextIntoSystemMessageMixin { + + @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") + def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") + def envelopeCreation(ctx: TraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } +} + +@Aspect +class TraceContextIntoRepointableActorRefMixin { + + @DeclareMixin("akka.actor.RepointableActorRef") + def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") + def envelopeCreation(ctx: TraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } + + @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") + def repointableActorRefCreation(repointableActorRef: TraceContextAware): Unit = {} + + @Around("repointableActorRefCreation(repointableActorRef)") + def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { + TraceRecorder.withInlineTraceContextReplacement(repointableActorRef.traceContext) { + pjp.proceed() + } + } +}
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala new file mode 100644 index 00000000..ebddbfc8 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala @@ -0,0 +1,83 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import kamon.Kamon +import kamon.akka.Akka +import kamon.trace.{ TraceRecorder, TraceContext, EmptyTraceContext, TraceContextAware } +import akka.actor.{ ActorSystem, ActorRef } +import akka.event.Logging.Warning +import akka.pattern.AskTimeoutException +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ +import org.aspectj.lang.reflect.SourceLocation +import scala.concurrent.Future +import scala.compat.Platform.EOL + +@Aspect +class AskPatternInstrumentation { + + import AskPatternInstrumentation._ + + @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && args(actor, *, *)") + def askableActorRefAsk(actor: ActorRef): Unit = {} + + @Around("askableActorRefAsk(actor)") + def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, actor: ActorRef): AnyRef = + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + val akkaExtension = Kamon(Akka)(system) + val future = pjp.proceed().asInstanceOf[Future[AnyRef]] + + val handler = akkaExtension.askPatternTimeoutWarning match { + case "off" ⇒ None + case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))(system)) + case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))(system)) + } + + handler.map(future.onFailure(_)(akkaExtension.dispatcher)) + future + + } getOrElse (pjp.proceed()) + + def errorHandler(callInfo: Option[CallInfo] = None, stack: Option[StackTraceCaptureException] = None)(implicit system: ActorSystem): ErrorHandler = { + case e: AskTimeoutException ⇒ + val message = { + if (stack.isDefined) stack.map(s ⇒ s.getStackTrace.drop(3).mkString("", EOL, EOL)) + else callInfo.map(_.message) + } + publish(message) + } + + def publish(message: Option[String])(implicit system: ActorSystem) = message map { msg ⇒ + system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], + s"Timeout triggered for ask pattern registered at: $msg")) + } +} + +object AskPatternInstrumentation { + type ErrorHandler = PartialFunction[Throwable, Unit] + + class StackTraceCaptureException extends Throwable + + case class CallInfo(name: String, sourceLocation: SourceLocation) { + def message: String = { + def locationInfo: String = Option(sourceLocation).map(location ⇒ s"${location.getFileName}:${location.getLine}").getOrElse("<unknown position>") + def line: String = s"$name @ $locationInfo" + s"$line" + } + } +}
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala new file mode 100644 index 00000000..8280edca --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala @@ -0,0 +1,164 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import java.lang.reflect.Method +import java.util.concurrent.ThreadPoolExecutor + +import akka.actor.{ ActorSystemImpl, Cancellable } +import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher } +import akka.kamon.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement +import kamon.Kamon +import kamon.akka.DispatcherMetrics +import DispatcherMetrics.DispatcherMetricRecorder +import kamon.metric.Metrics +import org.aspectj.lang.annotation._ + +import scala.concurrent.forkjoin.ForkJoinPool + +@Aspect +class DispatcherInstrumentation { + + @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") + def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} + + @Before("onActorSystemStartup(dispatchers, system)") + def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = { + val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem] + currentDispatchers.actorSystem = system + } + + @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)") + def onDispatchersLookup(dispatchers: Dispatchers) = {} + + @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") + def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { + val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] + + dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem + } + + @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))") + def onCreateExecutorService(): Unit = {} + + @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))") + def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {} + + @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)") + def onDispatcherStartup(dispatcher: Dispatcher): Unit = {} + + @After("onDispatcherStartup(dispatcher)") + def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { + + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] + val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) + val metricIdentity = DispatcherMetrics(dispatcher.id) + + dispatcherWithMetrics.metricIdentity = metricIdentity + dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory) + + if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) { + dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder { + dispatcherWithMetrics.dispatcherMetricsRecorder.map { + dm ⇒ + val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = + DispatcherMetricsCollector.collect(dispatcher) + + dm.maximumPoolSize.record(maximumPoolSize) + dm.runningThreadCount.record(runningThreadCount) + dm.queueTaskCount.record(queueTaskCount) + dm.poolSize.record(poolSize) + } + } + } + } + + @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)") + def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {} + + @After("onDispatcherShutdown(dispatcher)") + def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] + + dispatcherWithMetrics.dispatcherMetricsRecorder.map { + dispatcher ⇒ + dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() + Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity) + } + } +} + +@Aspect +class DispatcherMetricCollectionInfoIntoDispatcherMixin { + + @DeclareMixin("akka.dispatch.MessageDispatcher") + def mixinDispatcherMetricsToMessageDispatcher: DispatcherMetricCollectionInfo = new DispatcherMetricCollectionInfo {} + + @DeclareMixin("akka.dispatch.Dispatchers") + def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} +} + +trait DispatcherMetricCollectionInfo { + var metricIdentity: DispatcherMetrics = _ + var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ + var dispatcherCollectorCancellable: Cancellable = _ + var actorSystem: ActorSystemImpl = _ +} + +trait DispatchersWithActorSystem { + var actorSystem: ActorSystemImpl = _ +} + +object DispatcherMetricsCollector { + + case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long) + + private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = { + DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, + (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) + } + + private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { + DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) + } + + private val executorServiceMethod: Method = { + // executorService is protected + val method = classOf[Dispatcher].getDeclaredMethod("executorService") + method.setAccessible(true) + method + } + + def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = { + dispatcher match { + case x: Dispatcher ⇒ { + val executor = executorServiceMethod.invoke(x) match { + case delegate: ExecutorServiceDelegate ⇒ delegate.executor + case other ⇒ other + } + + executor match { + case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) + case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) + case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) + } + } + case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) + } + } +} |