From 01a34f67ff75419c440f2e69c0a0db888a670a34 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 12 Jan 2015 01:45:27 +0100 Subject: ! all: improve the metric recorders infrastructure --- .../src/main/scala/kamon/akka/ActorMetrics.scala | 93 ++------- .../src/main/scala/kamon/akka/AkkaExtension.scala | 8 +- .../main/scala/kamon/akka/DispatcherMetrics.scala | 104 +++++----- .../src/main/scala/kamon/akka/RouterMetrics.scala | 87 ++------- .../instrumentation/ActorCellInstrumentation.scala | 211 ++++++++++++++++++++ .../ActorLoggingInstrumentation.scala | 50 +++++ .../ActorSystemMessageInstrumentation.scala | 80 ++++++++ .../AskPatternInstrumentation.scala | 92 +++++++++ .../DispatcherInstrumentation.scala | 168 ++++++++++++++++ .../akka/ActorCellInstrumentation.scala | 213 --------------------- .../akka/ActorLoggingInstrumentation.scala | 50 ----- .../akka/ActorSystemMessageInstrumentation.scala | 80 -------- .../akka/AskPatternInstrumentation.scala | 83 -------- .../akka/DispatcherInstrumentation.scala | 164 ---------------- 14 files changed, 693 insertions(+), 790 deletions(-) create mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala create mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorLoggingInstrumentation.scala create mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentation.scala create mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala create mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala delete mode 100644 kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala delete mode 100644 kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala delete mode 100644 kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala delete mode 100644 kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala delete mode 100644 kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala (limited to 'kamon-akka/src/main/scala') diff --git a/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala index b22f7fa9..c99df586 100644 --- a/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala +++ b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala @@ -16,79 +16,26 @@ package kamon.akka -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric._ -import kamon.metric.instrument.{ Counter, Histogram, MinMaxCounter } +import kamon.metric.{ EntityRecorderFactory, GenericEntityRecorder } +import kamon.metric.instrument.{ Time, InstrumentFactory } -case class ActorMetrics(name: String) extends MetricGroupIdentity { - val category = ActorMetrics -} - -object ActorMetrics extends MetricGroupCategory { - val name = "actor" - - case object ProcessingTime extends MetricIdentity { val name = "processing-time" } - case object MailboxSize extends MetricIdentity { val name = "mailbox-size" } - case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter, - errors: Counter) extends MetricGroupRecorder { - - def collect(context: CollectionContext): ActorMetricSnapshot = - ActorMetricSnapshot( - processingTime.collect(context), - timeInMailbox.collect(context), - mailboxSize.collect(context), - errors.collect(context)) - - def cleanup: Unit = { - processingTime.cleanup - mailboxSize.cleanup - timeInMailbox.cleanup - errors.cleanup - } - } - - case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, - mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = ActorMetricSnapshot - - def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot = - ActorMetricSnapshot( - processingTime.merge(that.processingTime, context), - timeInMailbox.merge(that.timeInMailbox, context), - mailboxSize.merge(that.mailboxSize, context), - errors.merge(that.errors, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - (ProcessingTime -> processingTime), - (MailboxSize -> mailboxSize), - (TimeInMailbox -> timeInMailbox), - (Errors -> errors)) - } - - val Factory = ActorMetricGroupFactory +/** + * Entity recorder for Akka Actors. The metrics being tracked are: + * + * - time-in-mailbox: Time spent from the instant when a message is enqueued in a actor's mailbox to the instant when + * that message is dequeued for processing. + * - processing-time: Time taken for the actor to process the receive function. + * - mailbox-size: Size of the actor's mailbox. + * - errors: Number or errors seen by the actor's supervision mechanism. + */ +class ActorMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val timeInMailbox = histogram("time-in-mailbox", Time.Nanoseconds) + val processingTime = histogram("processing-time", Time.Nanoseconds) + val mailboxSize = minMaxCounter("mailbox-size") + val errors = counter("errors") } -case object ActorMetricGroupFactory extends MetricGroupFactory { - import kamon.akka.ActorMetrics._ - - type GroupRecorder = ActorMetricsRecorder - - def create(config: Config, system: ActorSystem): ActorMetricsRecorder = { - val settings = config.getConfig("precision.actor") - - val processingTimeConfig = settings.getConfig("processing-time") - val timeInMailboxConfig = settings.getConfig("time-in-mailbox") - val mailboxSizeConfig = settings.getConfig("mailbox-size") - - new ActorMetricsRecorder( - Histogram.fromConfig(processingTimeConfig), - Histogram.fromConfig(timeInMailboxConfig), - MinMaxCounter.fromConfig(mailboxSizeConfig, system), - Counter()) - } -} +object ActorMetrics extends EntityRecorderFactory[ActorMetrics] { + def category: String = "akka-actor" + def createRecorder(instrumentFactory: InstrumentFactory): ActorMetrics = new ActorMetrics(instrumentFactory) +} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala index bc013b63..cbca7db6 100644 --- a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala +++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala @@ -16,8 +16,8 @@ package kamon.akka -import akka.actor -import akka.actor._ +import _root_.akka.actor +import _root_.akka.actor._ import kamon._ class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { @@ -29,4 +29,6 @@ class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { object Akka extends ExtensionId[AkkaExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = Akka def createExtension(system: ExtendedActorSystem): AkkaExtension = new AkkaExtension(system) -} \ No newline at end of file + +} + diff --git a/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala index 64e16f96..acf92e70 100644 --- a/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala +++ b/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala @@ -16,79 +16,71 @@ package kamon.akka -import akka.actor.ActorSystem -import com.typesafe.config.Config +import java.util.concurrent.ThreadPoolExecutor + +import _root_.akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool import kamon.metric._ -import kamon.metric.instrument.Histogram +import kamon.metric.instrument.{ DifferentialValueCollector, InstrumentFactory } -case class DispatcherMetrics(name: String) extends MetricGroupIdentity { - val category = DispatcherMetrics -} +class ForkJoinPoolDispatcherMetrics(fjp: AkkaForkJoinPool, instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val paralellism = minMaxCounter("parallelism") + paralellism.increment(fjp.getParallelism) // Steady value. -object DispatcherMetrics extends MetricGroupCategory { - val name = "dispatcher" + val poolSize = gauge("pool-size", () ⇒ { + fjp.getPoolSize.toLong + }) - case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" } - case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" } - case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" } - case object PoolSize extends MetricIdentity { val name = "pool-size" } + val activeThreads = gauge("active-threads", () ⇒ { + fjp.getActiveThreadCount.toLong + }) - case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram, - queueTaskCount: Histogram, poolSize: Histogram) - extends MetricGroupRecorder { + val runningThreads = gauge("running-threads", () ⇒ { + fjp.getRunningThreadCount.toLong + }) - def collect(context: CollectionContext): MetricGroupSnapshot = - DispatcherMetricSnapshot( - maximumPoolSize.collect(context), - runningThreadCount.collect(context), - queueTaskCount.collect(context), - poolSize.collect(context)) + val queuedTaskCount = gauge("queued-task-count", () ⇒ { + fjp.getQueuedTaskCount + }) +} - def cleanup: Unit = {} +object ForkJoinPoolDispatcherMetrics { + def factory(fjp: AkkaForkJoinPool) = new EntityRecorderFactory[ForkJoinPoolDispatcherMetrics] { + def category: String = AkkaDispatcherMetrics.Category + def createRecorder(instrumentFactory: InstrumentFactory) = new ForkJoinPoolDispatcherMetrics(fjp, instrumentFactory) } +} - case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot, - queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot { +class ThreadPoolExecutorDispatcherMetrics(tpe: ThreadPoolExecutor, instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val corePoolSize = gauge("core-pool-size", () ⇒ { + tpe.getCorePoolSize.toLong + }) - type GroupSnapshotType = DispatcherMetricSnapshot + val maxPoolSize = gauge("max-pool-size", () ⇒ { + tpe.getMaximumPoolSize.toLong + }) - def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot = - DispatcherMetricSnapshot( - maximumPoolSize.merge(that.maximumPoolSize, context), - runningThreadCount.merge(that.runningThreadCount, context), - queueTaskCount.merge(that.queueTaskCount, context), - poolSize.merge(that.poolSize, context)) + val poolSize = gauge("pool-size", () ⇒ { + tpe.getPoolSize.toLong + }) - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - (MaximumPoolSize -> maximumPoolSize), - (RunningThreadCount -> runningThreadCount), - (QueueTaskCount -> queueTaskCount), - (PoolSize -> poolSize)) - } + val activeThreads = gauge("active-threads", () ⇒ { + tpe.getActiveCount.toLong + }) - val Factory = DispatcherMetricGroupFactory + val processedTasks = gauge("processed-tasks", DifferentialValueCollector(() ⇒ { + tpe.getTaskCount + })) } -case object DispatcherMetricGroupFactory extends MetricGroupFactory { +object ThreadPoolExecutorDispatcherMetrics { - import kamon.akka.DispatcherMetrics._ - - type GroupRecorder = DispatcherMetricRecorder - - def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = { - val settings = config.getConfig("precision.dispatcher") - - val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size") - val runningThreadCountConfig = settings.getConfig("running-thread-count") - val queueTaskCountConfig = settings.getConfig("queued-task-count") - val poolSizeConfig = settings.getConfig("pool-size") - - new DispatcherMetricRecorder( - Histogram.fromConfig(maximumPoolSizeConfig), - Histogram.fromConfig(runningThreadCountConfig), - Histogram.fromConfig(queueTaskCountConfig), - Histogram.fromConfig(poolSizeConfig)) + def factory(tpe: ThreadPoolExecutor) = new EntityRecorderFactory[ThreadPoolExecutorDispatcherMetrics] { + def category: String = AkkaDispatcherMetrics.Category + def createRecorder(instrumentFactory: InstrumentFactory) = new ThreadPoolExecutorDispatcherMetrics(tpe, instrumentFactory) } +} +object AkkaDispatcherMetrics { + val Category = "akka-dispatcher" } diff --git a/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala index 2eedf764..5c5bb05a 100644 --- a/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala +++ b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala @@ -15,75 +15,26 @@ */ package kamon.akka -import akka.actor.ActorSystem -import com.typesafe.config.Config import kamon.metric._ -import kamon.metric.instrument.{ Counter, Histogram } +import kamon.metric.instrument.{ Time, InstrumentFactory } -case class RouterMetrics(name: String) extends MetricGroupIdentity { - val category = RouterMetrics -} - -object RouterMetrics extends MetricGroupCategory { - val name = "router" - - case object RoutingTime extends MetricIdentity { val name = "routing-time" } - case object ProcessingTime extends MetricIdentity { val name = "processing-time" } - case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class RouterMetricsRecorder(routingTime: Histogram, processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder { - - def collect(context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot(routingTime.collect(context), processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context)) - - def cleanup: Unit = { - routingTime.cleanup - processingTime.cleanup - timeInMailbox.cleanup - errors.cleanup - } - } - - case class RouterMetricSnapshot(routingTime: Histogram.Snapshot, processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = RouterMetricSnapshot - - def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot( - routingTime.merge(that.routingTime, context), - processingTime.merge(that.processingTime, context), - timeInMailbox.merge(that.timeInMailbox, context), - errors.merge(that.errors, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - RoutingTime -> routingTime, - ProcessingTime -> processingTime, - TimeInMailbox -> timeInMailbox, - Errors -> errors) - } - - val Factory = RouterMetricGroupFactory -} - -case object RouterMetricGroupFactory extends MetricGroupFactory { - - import kamon.akka.RouterMetrics._ - - type GroupRecorder = RouterMetricsRecorder - - def create(config: Config, system: ActorSystem): RouterMetricsRecorder = { - val settings = config.getConfig("precision.router") - - val routingTimeConfig = settings.getConfig("routing-time") - val processingTimeConfig = settings.getConfig("processing-time") - val timeInMailboxConfig = settings.getConfig("time-in-mailbox") - - new RouterMetricsRecorder( - Histogram.fromConfig(routingTimeConfig), - Histogram.fromConfig(processingTimeConfig), - Histogram.fromConfig(timeInMailboxConfig), - Counter()) - } +/** + * Entity recorder for Akka Routers. The metrics being tracked are: + * + * - routing-time: Time taken for the router to process the routing logic. + * - time-in-mailbox: Time spent from the instant when a message is enqueued in a actor's mailbox to the instant when + * that message is dequeued for processing. + * - processing-time: Time taken for the actor to process the receive function. + * - errors: Number or errors seen by the actor's supervision mechanism. + */ +class RouterMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val routingTime = histogram("routing-time", Time.Nanoseconds) + val timeInMailbox = histogram("time-in-mailbox", Time.Nanoseconds) + val processingTime = histogram("processing-time", Time.Nanoseconds) + val errors = counter("errors") } +object RouterMetrics extends EntityRecorderFactory[RouterMetrics] { + def category: String = "akka-router" + def createRecorder(instrumentFactory: InstrumentFactory): RouterMetrics = new RouterMetrics(instrumentFactory) +} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala new file mode 100644 index 00000000..c961737d --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala @@ -0,0 +1,211 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import akka.actor._ +import akka.dispatch.{ Envelope, MessageDispatcher } +import akka.routing.RoutedActorCell +import kamon.akka.{ RouterMetrics, ActorMetrics } +import kamon.metric.{ Metrics, Entity } +import kamon.trace._ +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorCellInstrumentation { + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") + def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + Metrics.get(system).register(ActorMetrics, ref.path.elements.mkString("/")).map { registration ⇒ + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + + cellMetrics.entity = registration.entity + cellMetrics.recorder = Some(registration.recorder) + } + + } + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(envelope)") + def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} + + @Around("invokingActorBehaviourAtActorCell(cell, envelope)") + def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + val timestampBeforeProcessing = System.nanoTime() + val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] + + try { + TraceContext.withContext(contextAndTimestamp.traceContext) { + pjp.proceed() + } + } finally { + cellMetrics.recorder.map { am ⇒ + val processingTime = System.nanoTime() - timestampBeforeProcessing + val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime + + am.processingTime.record(processingTime) + am.timeInMailbox.record(timeInMailbox) + am.mailboxSize.decrement() + + // In case that this actor is behind a router, record the metrics for the router. + envelope.asInstanceOf[RouterAwareEnvelope].routerMetricsRecorder.map { rm ⇒ + rm.processingTime.record(processingTime) + rm.timeInMailbox.record(timeInMailbox) + } + } + } + } + + @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell) && args(envelope)") + def sendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = {} + + @After("sendMessageInActorCell(cell, envelope)") + def afterSendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = { + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + cellMetrics.recorder.map(_.mailboxSize.increment()) + } + + @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") + def actorStop(cell: ActorCell): Unit = {} + + @After("actorStop(cell)") + def afterStop(cell: ActorCell): Unit = { + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + cellMetrics.recorder.map { _ ⇒ + Metrics.get(cell.system).unregister(cellMetrics.entity) + } + + // The Stop can't be captured from the RoutedActorCell so we need to put this piece of cleanup here. + if (cell.isInstanceOf[RoutedActorCell]) { + val routedCellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] + routedCellMetrics.routerRecorder.map { _ ⇒ + Metrics.get(cell.system).unregister(routedCellMetrics.routerEntity) + } + } + } + + @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") + def actorInvokeFailure(cell: ActorCell): Unit = {} + + @Before("actorInvokeFailure(cell)") + def beforeInvokeFailure(cell: ActorCell): Unit = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + cellWithMetrics.recorder.map(_.errors.increment()) + + // In case that this actor is behind a router, count the errors for the router as well. + val envelope = cell.currentMessage.asInstanceOf[RouterAwareEnvelope] + envelope.routerMetricsRecorder.map(_.errors.increment()) + } +} + +@Aspect +class RoutedActorCellInstrumentation { + + @Pointcut("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, routeeProps, supervisor)") + def routedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = {} + + @After("routedActorCellCreation(cell, system, ref, props, dispatcher, routeeProps, supervisor)") + def afterRoutedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = { + Metrics.get(system).register(RouterMetrics, ref.path.elements.mkString("/")).map { registration ⇒ + val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] + + cellMetrics.routerEntity = registration.entity + cellMetrics.routerRecorder = Some(registration.recorder) + } + } + + @Pointcut("execution(* akka.routing.RoutedActorCell.sendMessage(*)) && this(cell) && args(envelope)") + def sendMessageInRouterActorCell(cell: RoutedActorCell, envelope: Envelope) = {} + + @Around("sendMessageInRouterActorCell(cell, envelope)") + def aroundSendMessageInRouterActorCell(pjp: ProceedingJoinPoint, cell: RoutedActorCell, envelope: Envelope): Any = { + val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] + val timestampBeforeProcessing = System.nanoTime() + val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] + + try { + TraceContext.withContext(contextAndTimestamp.traceContext) { + + // The router metrics recorder will only be picked up if the message is sent from a tracked router. + RouterAwareEnvelope.dynamicRouterMetricsRecorder.withValue(cellMetrics.routerRecorder) { + pjp.proceed() + } + } + } finally { + cellMetrics.routerRecorder map { routerRecorder ⇒ + routerRecorder.routingTime.record(System.nanoTime() - timestampBeforeProcessing) + } + } + } +} + +trait ActorCellMetrics { + var entity: Entity = _ + var recorder: Option[ActorMetrics] = None +} + +trait RoutedActorCellMetrics { + var routerEntity: Entity = _ + var routerRecorder: Option[RouterMetrics] = None +} + +trait RouterAwareEnvelope { + def routerMetricsRecorder: Option[RouterMetrics] +} + +object RouterAwareEnvelope { + import scala.util.DynamicVariable + private[kamon] val dynamicRouterMetricsRecorder = new DynamicVariable[Option[RouterMetrics]](None) + + def default: RouterAwareEnvelope = new RouterAwareEnvelope { + val routerMetricsRecorder: Option[RouterMetrics] = dynamicRouterMetricsRecorder.value + } +} + +@Aspect +class MetricsIntoActorCellsMixin { + + @DeclareMixin("akka.actor.ActorCell") + def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} + + @DeclareMixin("akka.routing.RoutedActorCell") + def mixinActorCellMetricsToRoutedActorCell: RoutedActorCellMetrics = new RoutedActorCellMetrics {} + +} + +@Aspect +class TraceContextIntoEnvelopeMixin { + + @DeclareMixin("akka.dispatch.Envelope") + def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default + + @DeclareMixin("akka.dispatch.Envelope") + def mixinRouterAwareToEnvelope: RouterAwareEnvelope = RouterAwareEnvelope.default + + @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") + def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TimestampedTraceContextAware with RouterAwareEnvelope): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + ctx.routerMetricsRecorder + } +} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorLoggingInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorLoggingInstrumentation.scala new file mode 100644 index 00000000..dd998c6b --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorLoggingInstrumentation.scala @@ -0,0 +1,50 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import kamon.trace.logging.MdcKeysSupport +import kamon.trace.{ TraceContext, TraceContextAware } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorLoggingInstrumentation extends MdcKeysSupport { + + @DeclareMixin("akka.event.Logging.LogEvent+") + def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.event.Logging.LogEvent+.new(..)) && this(event)") + def logEventCreation(event: TraceContextAware): Unit = {} + + @After("logEventCreation(event)") + def captureTraceContext(event: TraceContextAware): Unit = { + // Force initialization of TraceContextAware + event.traceContext + } + + @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") + def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {} + + @Around("withMdcInvocation(logSource, logEvent, logStatement)") + def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { + TraceContext.withContext(logEvent.traceContext) { + withMdc { + pjp.proceed() + } + } + } +} diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentation.scala new file mode 100644 index 00000000..0cb4ef13 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentation.scala @@ -0,0 +1,80 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import akka.dispatch.sysmsg.EarliestFirstSystemMessageList +import kamon.trace.{ TraceContext, TraceContextAware } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorSystemMessageInstrumentation { + + @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") + def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} + + @Around("systemMessageProcessing(messages)") + def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { + if (messages.nonEmpty) { + val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext + TraceContext.withContext(ctx)(pjp.proceed()) + + } else pjp.proceed() + } +} + +@Aspect +class TraceContextIntoSystemMessageMixin { + + @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") + def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") + def envelopeCreation(ctx: TraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } +} + +@Aspect +class TraceContextIntoRepointableActorRefMixin { + + @DeclareMixin("akka.actor.RepointableActorRef") + def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") + def envelopeCreation(ctx: TraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } + + @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") + def repointableActorRefCreation(repointableActorRef: TraceContextAware): Unit = {} + + @Around("repointableActorRefCreation(repointableActorRef)") + def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { + TraceContext.withContext(repointableActorRef.traceContext) { + pjp.proceed() + } + } +} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala new file mode 100644 index 00000000..28bfcae9 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala @@ -0,0 +1,92 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import akka.util.Timeout +import kamon.Kamon +import kamon.akka.Akka +import kamon.trace.{ TraceContext, EmptyTraceContext, TraceContextAware } +import akka.actor.{ InternalActorRef, ActorSystem, ActorRef } +import akka.event.Logging.Warning +import akka.pattern.{ PromiseActorRef, AskTimeoutException } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ +import org.aspectj.lang.reflect.SourceLocation +import scala.concurrent.Future +import scala.compat.Platform.EOL +import scala.concurrent.duration.FiniteDuration + +@Aspect +class AskPatternInstrumentation { + + import AskPatternInstrumentation._ + + @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && args(actor, *, timeout)") + def askableActorRefAsk(actor: ActorRef, timeout: Timeout): Unit = {} + + @Around("askableActorRefAsk(actor, timeout)") + def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, actor: ActorRef, timeout: Timeout): AnyRef = + TraceContext.map { ctx ⇒ + actor match { + // the AskPattern will only work for InternalActorRef's with these conditions. + case ref: InternalActorRef if !ref.isTerminated && timeout.duration.length > 0 ⇒ + val akkaExtension = ctx.lookupExtension(Akka) + val future = pjp.proceed().asInstanceOf[Future[AnyRef]] + val system = ref.provider.guardian.underlying.system + + val handler = akkaExtension.askPatternTimeoutWarning match { + case "off" ⇒ None + case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))(system)) + case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))(system)) + } + + handler.map(future.onFailure(_)(akkaExtension.dispatcher)) + future + + case _ ⇒ pjp.proceed() // + } + + } getOrElse (pjp.proceed()) + + def errorHandler(callInfo: Option[CallInfo] = None, stack: Option[StackTraceCaptureException] = None)(implicit system: ActorSystem): ErrorHandler = { + case e: AskTimeoutException ⇒ + val message = { + if (stack.isDefined) stack.map(s ⇒ s.getStackTrace.drop(3).mkString("", EOL, EOL)) + else callInfo.map(_.message) + } + publish(message) + } + + def publish(message: Option[String])(implicit system: ActorSystem) = message map { msg ⇒ + system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], + s"Timeout triggered for ask pattern registered at: $msg")) + } +} + +object AskPatternInstrumentation { + type ErrorHandler = PartialFunction[Throwable, Unit] + + class StackTraceCaptureException extends Throwable + + case class CallInfo(name: String, sourceLocation: SourceLocation) { + def message: String = { + def locationInfo: String = Option(sourceLocation).map(location ⇒ s"${location.getFileName}:${location.getLine}").getOrElse("") + def line: String = s"$name @ $locationInfo" + s"$line" + } + } +} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala new file mode 100644 index 00000000..f4bc31c4 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala @@ -0,0 +1,168 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor } + +import akka.actor.{ ActorSystem, ActorSystemImpl } +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool +import akka.dispatch._ +import akka.kamon.instrumentation.LookupDataAware.LookupData +import kamon.akka.{ AkkaDispatcherMetrics, ThreadPoolExecutorDispatcherMetrics, ForkJoinPoolDispatcherMetrics } +import kamon.metric.{ Metrics, Entity } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class DispatcherInstrumentation { + + @Pointcut("execution(* akka.actor.ActorSystemImpl.start(..)) && this(system)") + def actorSystemInitialization(system: ActorSystemImpl): Unit = {} + + @Before("actorSystemInitialization(system)") + def afterActorSystemInitialization(system: ActorSystemImpl): Unit = { + system.dispatchers.asInstanceOf[ActorSystemAware].actorSystem = system + + // The default dispatcher for the actor system is looked up in the ActorSystemImpl's initialization code and we + // can't get the Metrics extension there since the ActorSystem is not yet fully constructed. To workaround that + // we are manually selecting and registering the default dispatcher with the Metrics extension. All other dispatchers + // will by registered by the instrumentation bellow. + + // Yes, reflection sucks, but this piece of code is only executed once on ActorSystem's startup. + val defaultDispatcher = system.dispatcher + val executorServiceDelegateField = defaultDispatcher.getClass.getDeclaredField("executorServiceDelegate") + executorServiceDelegateField.setAccessible(true) + + val lazyExecutorServiceDelegate = executorServiceDelegateField.get(defaultDispatcher) + val executorField = lazyExecutorServiceDelegate.getClass.getMethod("executor") + executorField.setAccessible(true) + + val defaultDispatcherExecutor = executorField.invoke(lazyExecutorServiceDelegate).asInstanceOf[ExecutorService] + registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system) + } + + private def registerDispatcher(dispatcherName: String, executorService: ExecutorService, system: ActorSystem): Unit = + executorService match { + case fjp: AkkaForkJoinPool ⇒ + Metrics.get(system).register(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherName) + + case tpe: ThreadPoolExecutor ⇒ + Metrics.get(system).register(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherName) + + case others ⇒ // Currently not interested in other kinds of dispatchers. + } + + @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers) && args(dispatcherName)") + def dispatchersLookup(dispatchers: ActorSystemAware, dispatcherName: String) = {} + + @Around("dispatchersLookup(dispatchers, dispatcherName)") + def aroundDispatchersLookup(pjp: ProceedingJoinPoint, dispatchers: ActorSystemAware, dispatcherName: String): Any = + LookupDataAware.withLookupData(LookupData(dispatcherName, dispatchers.actorSystem)) { + pjp.proceed() + } + + @Pointcut("initialization(akka.dispatch.ExecutorServiceFactory.new(..)) && target(factory)") + def executorServiceFactoryInitialization(factory: LookupDataAware): Unit = {} + + @After("executorServiceFactoryInitialization(factory)") + def afterExecutorServiceFactoryInitialization(factory: LookupDataAware): Unit = + factory.lookupData = LookupDataAware.currentLookupData + + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactory+.createExecutorService()) && this(factory) && !cflow(execution(* akka.dispatch.Dispatcher.shutdown()))") + def createExecutorService(factory: LookupDataAware): Unit = {} + + @AfterReturning(pointcut = "createExecutorService(factory)", returning = "executorService") + def afterCreateExecutorService(factory: LookupDataAware, executorService: ExecutorService): Unit = { + val lookupData = factory.lookupData + + // lookupData.actorSystem will be null only during the first lookup of the default dispatcher during the + // ActorSystemImpl's initialization. + if (lookupData.actorSystem != null) + registerDispatcher(lookupData.dispatcherName, executorService, lookupData.actorSystem) + } + + @Pointcut("initialization(akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.new(..)) && this(lazyExecutor)") + def lazyExecutorInitialization(lazyExecutor: LookupDataAware): Unit = {} + + @After("lazyExecutorInitialization(lazyExecutor)") + def afterLazyExecutorInitialization(lazyExecutor: LookupDataAware): Unit = + lazyExecutor.lookupData = LookupDataAware.currentLookupData + + @Pointcut("execution(* akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.copy()) && this(lazyExecutor)") + def lazyExecutorCopy(lazyExecutor: LookupDataAware): Unit = {} + + @Around("lazyExecutorCopy(lazyExecutor)") + def aroundLazyExecutorCopy(pjp: ProceedingJoinPoint, lazyExecutor: LookupDataAware): Any = + LookupDataAware.withLookupData(lazyExecutor.lookupData) { + pjp.proceed() + } + + @Pointcut("execution(* akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.shutdown()) && this(lazyExecutor)") + def lazyExecutorShutdown(lazyExecutor: LookupDataAware): Unit = {} + + @After("lazyExecutorShutdown(lazyExecutor)") + def afterLazyExecutorShutdown(lazyExecutor: LookupDataAware): Unit = { + import lazyExecutor.lookupData + + if (lookupData.actorSystem != null) + Metrics.get(lookupData.actorSystem).unregister(Entity(lookupData.dispatcherName, AkkaDispatcherMetrics.Category)) + } + +} + +@Aspect +class DispatcherMetricCollectionInfoIntoDispatcherMixin { + + @DeclareMixin("akka.dispatch.Dispatchers") + def mixinActorSystemAwareToDispatchers: ActorSystemAware = ActorSystemAware() + + @DeclareMixin("akka.dispatch.Dispatcher.LazyExecutorServiceDelegate") + def mixinLookupDataAwareToExecutors: LookupDataAware = LookupDataAware() + + @DeclareMixin("akka.dispatch.ExecutorServiceFactory+") + def mixinActorSystemAwareToDispatcher: LookupDataAware = LookupDataAware() +} + +trait ActorSystemAware { + @volatile var actorSystem: ActorSystem = _ +} + +object ActorSystemAware { + def apply(): ActorSystemAware = new ActorSystemAware {} +} + +trait LookupDataAware { + @volatile var lookupData: LookupData = _ +} + +object LookupDataAware { + case class LookupData(dispatcherName: String, actorSystem: ActorSystem) + + private val _currentDispatcherLookupData = new ThreadLocal[LookupData] + + def apply() = new LookupDataAware {} + + def currentLookupData: LookupData = _currentDispatcherLookupData.get() + + def withLookupData[T](lookupData: LookupData)(thunk: ⇒ T): T = { + _currentDispatcherLookupData.set(lookupData) + val result = thunk + _currentDispatcherLookupData.remove() + + result + } +} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala deleted file mode 100644 index 78d88583..00000000 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ /dev/null @@ -1,213 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import akka.actor._ -import akka.dispatch.{ Envelope, MessageDispatcher } -import akka.routing.RoutedActorCell -import kamon.Kamon -import kamon.akka.{ RouterMetrics, ActorMetrics } -import ActorMetrics.ActorMetricsRecorder -import RouterMetrics.RouterMetricsRecorder -import kamon.metric.Metrics -import kamon.trace._ -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorCellInstrumentation { - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") - def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - - @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") - def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - val metricsExtension = Kamon(Metrics)(system) - val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) - val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellMetrics.actorMetricIdentity = metricIdentity - cellMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) - } - - @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(envelope)") - def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} - - @Around("invokingActorBehaviourAtActorCell(cell, envelope)") - def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { - val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] - - try { - TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { - pjp.proceed() - } - } finally { - cellMetrics.actorMetricsRecorder.map { am ⇒ - val processingTime = System.nanoTime() - timestampBeforeProcessing - val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime - - am.processingTime.record(processingTime) - am.timeInMailbox.record(timeInMailbox) - am.mailboxSize.decrement() - - // In case that this actor is behind a router, record the metrics for the router. - envelope.asInstanceOf[RouterAwareEnvelope].routerMetricsRecorder.map { rm ⇒ - rm.processingTime.record(processingTime) - rm.timeInMailbox.record(timeInMailbox) - } - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell) && args(envelope)") - def sendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = {} - - @After("sendMessageInActorCell(cell, envelope)") - def afterSendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = { - val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - cellMetrics.actorMetricsRecorder.map(_.mailboxSize.increment()) - } - - @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") - def actorStop(cell: ActorCell): Unit = {} - - @After("actorStop(cell)") - def afterStop(cell: ActorCell): Unit = { - val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - cellMetrics.actorMetricsRecorder.map { _ ⇒ - Kamon(Metrics)(cell.system).unregister(cellMetrics.actorMetricIdentity) - } - - // The Stop can't be captured from the RoutedActorCell so we need to put this piece of cleanup here. - if (cell.isInstanceOf[RoutedActorCell]) { - val routedCellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] - routedCellMetrics.routerMetricsRecorder.map { _ ⇒ - Kamon(Metrics)(cell.system).unregister(routedCellMetrics.routerMetricIdentity) - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") - def actorInvokeFailure(cell: ActorCell): Unit = {} - - @Before("actorInvokeFailure(cell)") - def beforeInvokeFailure(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map(_.errors.increment()) - - // In case that this actor is behind a router, count the errors for the router as well. - val envelope = cell.currentMessage.asInstanceOf[RouterAwareEnvelope] - envelope.routerMetricsRecorder.map(_.errors.increment()) - } -} - -@Aspect -class RoutedActorCellInstrumentation { - - @Pointcut("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, routeeProps, supervisor)") - def routedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = {} - - @After("routedActorCellCreation(cell, system, ref, props, dispatcher, routeeProps, supervisor)") - def afterRoutedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = { - val metricsExtension = Kamon(Metrics)(system) - val metricIdentity = RouterMetrics(ref.path.elements.mkString("/")) - val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] - - cellMetrics.routerMetricIdentity = metricIdentity - cellMetrics.routerMetricsRecorder = metricsExtension.register(metricIdentity, RouterMetrics.Factory) - } - - @Pointcut("execution(* akka.routing.RoutedActorCell.sendMessage(*)) && this(cell) && args(envelope)") - def sendMessageInRouterActorCell(cell: RoutedActorCell, envelope: Envelope) = {} - - @Around("sendMessageInRouterActorCell(cell, envelope)") - def aroundSendMessageInRouterActorCell(pjp: ProceedingJoinPoint, cell: RoutedActorCell, envelope: Envelope): Any = { - val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] - val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] - - try { - TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { - - // The router metrics recorder will only be picked up if the message is sent from a tracked router. - RouterAwareEnvelope.dynamicRouterMetricsRecorder.withValue(cellMetrics.routerMetricsRecorder) { - pjp.proceed() - } - } - } finally { - cellMetrics.routerMetricsRecorder map { routerRecorder ⇒ - routerRecorder.routingTime.record(System.nanoTime() - timestampBeforeProcessing) - } - } - } -} - -trait ActorCellMetrics { - var actorMetricIdentity: ActorMetrics = _ - var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ -} - -trait RoutedActorCellMetrics { - var routerMetricIdentity: RouterMetrics = _ - var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ -} - -trait RouterAwareEnvelope { - def routerMetricsRecorder: Option[RouterMetricsRecorder] -} - -object RouterAwareEnvelope { - import scala.util.DynamicVariable - private[kamon] val dynamicRouterMetricsRecorder = new DynamicVariable[Option[RouterMetricsRecorder]](None) - - def default: RouterAwareEnvelope = new RouterAwareEnvelope { - val routerMetricsRecorder: Option[RouterMetricsRecorder] = dynamicRouterMetricsRecorder.value - } -} - -@Aspect -class MetricsIntoActorCellsMixin { - - @DeclareMixin("akka.actor.ActorCell") - def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} - - @DeclareMixin("akka.routing.RoutedActorCell") - def mixinActorCellMetricsToRoutedActorCell: RoutedActorCellMetrics = new RoutedActorCellMetrics {} - -} - -@Aspect -class TraceContextIntoEnvelopeMixin { - - @DeclareMixin("akka.dispatch.Envelope") - def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default - - @DeclareMixin("akka.dispatch.Envelope") - def mixinRouterAwareToEnvelope: RouterAwareEnvelope = RouterAwareEnvelope.default - - @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TimestampedTraceContextAware with RouterAwareEnvelope): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - ctx.routerMetricsRecorder - } -} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala deleted file mode 100644 index e0e5d316..00000000 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import kamon.trace.logging.MdcKeysSupport -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorLoggingInstrumentation extends MdcKeysSupport { - - @DeclareMixin("akka.event.Logging.LogEvent+") - def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.event.Logging.LogEvent+.new(..)) && this(event)") - def logEventCreation(event: TraceContextAware): Unit = {} - - @After("logEventCreation(event)") - def captureTraceContext(event: TraceContextAware): Unit = { - // Force initialization of TraceContextAware - event.traceContext - } - - @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") - def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {} - - @Around("withMdcInvocation(logSource, logEvent, logStatement)") - def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { - TraceRecorder.withInlineTraceContextReplacement(logEvent.traceContext) { - withMdc { - pjp.proceed() - } - } - } -} diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala deleted file mode 100644 index 48016876..00000000 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import akka.dispatch.sysmsg.EarliestFirstSystemMessageList -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorSystemMessageInstrumentation { - - @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") - def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} - - @Around("systemMessageProcessing(messages)") - def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { - if (messages.nonEmpty) { - val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext - TraceRecorder.withInlineTraceContextReplacement(ctx)(pjp.proceed()) - - } else pjp.proceed() - } -} - -@Aspect -class TraceContextIntoSystemMessageMixin { - - @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") - def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } -} - -@Aspect -class TraceContextIntoRepointableActorRefMixin { - - @DeclareMixin("akka.actor.RepointableActorRef") - def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } - - @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") - def repointableActorRefCreation(repointableActorRef: TraceContextAware): Unit = {} - - @Around("repointableActorRefCreation(repointableActorRef)") - def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { - TraceRecorder.withInlineTraceContextReplacement(repointableActorRef.traceContext) { - pjp.proceed() - } - } -} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala deleted file mode 100644 index ebddbfc8..00000000 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import kamon.Kamon -import kamon.akka.Akka -import kamon.trace.{ TraceRecorder, TraceContext, EmptyTraceContext, TraceContextAware } -import akka.actor.{ ActorSystem, ActorRef } -import akka.event.Logging.Warning -import akka.pattern.AskTimeoutException -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ -import org.aspectj.lang.reflect.SourceLocation -import scala.concurrent.Future -import scala.compat.Platform.EOL - -@Aspect -class AskPatternInstrumentation { - - import AskPatternInstrumentation._ - - @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && args(actor, *, *)") - def askableActorRefAsk(actor: ActorRef): Unit = {} - - @Around("askableActorRefAsk(actor)") - def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, actor: ActorRef): AnyRef = - TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - val akkaExtension = Kamon(Akka)(system) - val future = pjp.proceed().asInstanceOf[Future[AnyRef]] - - val handler = akkaExtension.askPatternTimeoutWarning match { - case "off" ⇒ None - case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))(system)) - case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))(system)) - } - - handler.map(future.onFailure(_)(akkaExtension.dispatcher)) - future - - } getOrElse (pjp.proceed()) - - def errorHandler(callInfo: Option[CallInfo] = None, stack: Option[StackTraceCaptureException] = None)(implicit system: ActorSystem): ErrorHandler = { - case e: AskTimeoutException ⇒ - val message = { - if (stack.isDefined) stack.map(s ⇒ s.getStackTrace.drop(3).mkString("", EOL, EOL)) - else callInfo.map(_.message) - } - publish(message) - } - - def publish(message: Option[String])(implicit system: ActorSystem) = message map { msg ⇒ - system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], - s"Timeout triggered for ask pattern registered at: $msg")) - } -} - -object AskPatternInstrumentation { - type ErrorHandler = PartialFunction[Throwable, Unit] - - class StackTraceCaptureException extends Throwable - - case class CallInfo(name: String, sourceLocation: SourceLocation) { - def message: String = { - def locationInfo: String = Option(sourceLocation).map(location ⇒ s"${location.getFileName}:${location.getLine}").getOrElse("") - def line: String = s"$name @ $locationInfo" - s"$line" - } - } -} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala deleted file mode 100644 index 8280edca..00000000 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala +++ /dev/null @@ -1,164 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import java.lang.reflect.Method -import java.util.concurrent.ThreadPoolExecutor - -import akka.actor.{ ActorSystemImpl, Cancellable } -import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher } -import akka.kamon.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement -import kamon.Kamon -import kamon.akka.DispatcherMetrics -import DispatcherMetrics.DispatcherMetricRecorder -import kamon.metric.Metrics -import org.aspectj.lang.annotation._ - -import scala.concurrent.forkjoin.ForkJoinPool - -@Aspect -class DispatcherInstrumentation { - - @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") - def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} - - @Before("onActorSystemStartup(dispatchers, system)") - def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = { - val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem] - currentDispatchers.actorSystem = system - } - - @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)") - def onDispatchersLookup(dispatchers: Dispatchers) = {} - - @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") - def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { - val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - - dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem - } - - @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))") - def onCreateExecutorService(): Unit = {} - - @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))") - def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {} - - @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)") - def onDispatcherStartup(dispatcher: Dispatcher): Unit = {} - - @After("onDispatcherStartup(dispatcher)") - def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { - - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) - val metricIdentity = DispatcherMetrics(dispatcher.id) - - dispatcherWithMetrics.metricIdentity = metricIdentity - dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory) - - if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) { - dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder { - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dm ⇒ - val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = - DispatcherMetricsCollector.collect(dispatcher) - - dm.maximumPoolSize.record(maximumPoolSize) - dm.runningThreadCount.record(runningThreadCount) - dm.queueTaskCount.record(queueTaskCount) - dm.poolSize.record(poolSize) - } - } - } - } - - @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)") - def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {} - - @After("onDispatcherShutdown(dispatcher)") - def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dispatcher ⇒ - dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() - Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity) - } - } -} - -@Aspect -class DispatcherMetricCollectionInfoIntoDispatcherMixin { - - @DeclareMixin("akka.dispatch.MessageDispatcher") - def mixinDispatcherMetricsToMessageDispatcher: DispatcherMetricCollectionInfo = new DispatcherMetricCollectionInfo {} - - @DeclareMixin("akka.dispatch.Dispatchers") - def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} -} - -trait DispatcherMetricCollectionInfo { - var metricIdentity: DispatcherMetrics = _ - var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ - var dispatcherCollectorCancellable: Cancellable = _ - var actorSystem: ActorSystemImpl = _ -} - -trait DispatchersWithActorSystem { - var actorSystem: ActorSystemImpl = _ -} - -object DispatcherMetricsCollector { - - case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long) - - private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, - (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) - } - - private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) - } - - private val executorServiceMethod: Method = { - // executorService is protected - val method = classOf[Dispatcher].getDeclaredMethod("executorService") - method.setAccessible(true) - method - } - - def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = { - dispatcher match { - case x: Dispatcher ⇒ { - val executor = executorServiceMethod.invoke(x) match { - case delegate: ExecutorServiceDelegate ⇒ delegate.executor - case other ⇒ other - } - - executor match { - case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) - case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) - case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } - case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } -} -- cgit v1.2.3