diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-12 01:45:27 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-24 23:19:01 +0100 |
commit | 01a34f67ff75419c440f2e69c0a0db888a670a34 (patch) | |
tree | 9c4dee4e9c13c26937356950f9e4927c3f9dfb7d /kamon-akka/src/main/scala | |
parent | 4a47e92d23af371f1d50b40af6cbe00a5ffc0105 (diff) | |
download | Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.gz Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.bz2 Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.zip |
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-akka/src/main/scala')
10 files changed, 321 insertions, 418 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala index b22f7fa9..c99df586 100644 --- a/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala +++ b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala @@ -16,79 +16,26 @@ package kamon.akka -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric._ -import kamon.metric.instrument.{ Counter, Histogram, MinMaxCounter } +import kamon.metric.{ EntityRecorderFactory, GenericEntityRecorder } +import kamon.metric.instrument.{ Time, InstrumentFactory } -case class ActorMetrics(name: String) extends MetricGroupIdentity { - val category = ActorMetrics -} - -object ActorMetrics extends MetricGroupCategory { - val name = "actor" - - case object ProcessingTime extends MetricIdentity { val name = "processing-time" } - case object MailboxSize extends MetricIdentity { val name = "mailbox-size" } - case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter, - errors: Counter) extends MetricGroupRecorder { - - def collect(context: CollectionContext): ActorMetricSnapshot = - ActorMetricSnapshot( - processingTime.collect(context), - timeInMailbox.collect(context), - mailboxSize.collect(context), - errors.collect(context)) - - def cleanup: Unit = { - processingTime.cleanup - mailboxSize.cleanup - timeInMailbox.cleanup - errors.cleanup - } - } - - case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, - mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = ActorMetricSnapshot - - def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot = - ActorMetricSnapshot( - processingTime.merge(that.processingTime, context), - timeInMailbox.merge(that.timeInMailbox, context), - mailboxSize.merge(that.mailboxSize, context), - errors.merge(that.errors, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - (ProcessingTime -> processingTime), - (MailboxSize -> mailboxSize), - (TimeInMailbox -> timeInMailbox), - (Errors -> errors)) - } - - val Factory = ActorMetricGroupFactory +/** + * Entity recorder for Akka Actors. The metrics being tracked are: + * + * - time-in-mailbox: Time spent from the instant when a message is enqueued in a actor's mailbox to the instant when + * that message is dequeued for processing. + * - processing-time: Time taken for the actor to process the receive function. + * - mailbox-size: Size of the actor's mailbox. + * - errors: Number or errors seen by the actor's supervision mechanism. + */ +class ActorMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val timeInMailbox = histogram("time-in-mailbox", Time.Nanoseconds) + val processingTime = histogram("processing-time", Time.Nanoseconds) + val mailboxSize = minMaxCounter("mailbox-size") + val errors = counter("errors") } -case object ActorMetricGroupFactory extends MetricGroupFactory { - import kamon.akka.ActorMetrics._ - - type GroupRecorder = ActorMetricsRecorder - - def create(config: Config, system: ActorSystem): ActorMetricsRecorder = { - val settings = config.getConfig("precision.actor") - - val processingTimeConfig = settings.getConfig("processing-time") - val timeInMailboxConfig = settings.getConfig("time-in-mailbox") - val mailboxSizeConfig = settings.getConfig("mailbox-size") - - new ActorMetricsRecorder( - Histogram.fromConfig(processingTimeConfig), - Histogram.fromConfig(timeInMailboxConfig), - MinMaxCounter.fromConfig(mailboxSizeConfig, system), - Counter()) - } -} +object ActorMetrics extends EntityRecorderFactory[ActorMetrics] { + def category: String = "akka-actor" + def createRecorder(instrumentFactory: InstrumentFactory): ActorMetrics = new ActorMetrics(instrumentFactory) +}
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala index bc013b63..cbca7db6 100644 --- a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala +++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala @@ -16,8 +16,8 @@ package kamon.akka -import akka.actor -import akka.actor._ +import _root_.akka.actor +import _root_.akka.actor._ import kamon._ class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { @@ -29,4 +29,6 @@ class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { object Akka extends ExtensionId[AkkaExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = Akka def createExtension(system: ExtendedActorSystem): AkkaExtension = new AkkaExtension(system) -}
\ No newline at end of file + +} + diff --git a/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala index 64e16f96..acf92e70 100644 --- a/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala +++ b/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala @@ -16,79 +16,71 @@ package kamon.akka -import akka.actor.ActorSystem -import com.typesafe.config.Config +import java.util.concurrent.ThreadPoolExecutor + +import _root_.akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool import kamon.metric._ -import kamon.metric.instrument.Histogram +import kamon.metric.instrument.{ DifferentialValueCollector, InstrumentFactory } -case class DispatcherMetrics(name: String) extends MetricGroupIdentity { - val category = DispatcherMetrics -} +class ForkJoinPoolDispatcherMetrics(fjp: AkkaForkJoinPool, instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val paralellism = minMaxCounter("parallelism") + paralellism.increment(fjp.getParallelism) // Steady value. -object DispatcherMetrics extends MetricGroupCategory { - val name = "dispatcher" + val poolSize = gauge("pool-size", () ⇒ { + fjp.getPoolSize.toLong + }) - case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" } - case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" } - case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" } - case object PoolSize extends MetricIdentity { val name = "pool-size" } + val activeThreads = gauge("active-threads", () ⇒ { + fjp.getActiveThreadCount.toLong + }) - case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram, - queueTaskCount: Histogram, poolSize: Histogram) - extends MetricGroupRecorder { + val runningThreads = gauge("running-threads", () ⇒ { + fjp.getRunningThreadCount.toLong + }) - def collect(context: CollectionContext): MetricGroupSnapshot = - DispatcherMetricSnapshot( - maximumPoolSize.collect(context), - runningThreadCount.collect(context), - queueTaskCount.collect(context), - poolSize.collect(context)) + val queuedTaskCount = gauge("queued-task-count", () ⇒ { + fjp.getQueuedTaskCount + }) +} - def cleanup: Unit = {} +object ForkJoinPoolDispatcherMetrics { + def factory(fjp: AkkaForkJoinPool) = new EntityRecorderFactory[ForkJoinPoolDispatcherMetrics] { + def category: String = AkkaDispatcherMetrics.Category + def createRecorder(instrumentFactory: InstrumentFactory) = new ForkJoinPoolDispatcherMetrics(fjp, instrumentFactory) } +} - case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot, - queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot { +class ThreadPoolExecutorDispatcherMetrics(tpe: ThreadPoolExecutor, instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val corePoolSize = gauge("core-pool-size", () ⇒ { + tpe.getCorePoolSize.toLong + }) - type GroupSnapshotType = DispatcherMetricSnapshot + val maxPoolSize = gauge("max-pool-size", () ⇒ { + tpe.getMaximumPoolSize.toLong + }) - def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot = - DispatcherMetricSnapshot( - maximumPoolSize.merge(that.maximumPoolSize, context), - runningThreadCount.merge(that.runningThreadCount, context), - queueTaskCount.merge(that.queueTaskCount, context), - poolSize.merge(that.poolSize, context)) + val poolSize = gauge("pool-size", () ⇒ { + tpe.getPoolSize.toLong + }) - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - (MaximumPoolSize -> maximumPoolSize), - (RunningThreadCount -> runningThreadCount), - (QueueTaskCount -> queueTaskCount), - (PoolSize -> poolSize)) - } + val activeThreads = gauge("active-threads", () ⇒ { + tpe.getActiveCount.toLong + }) - val Factory = DispatcherMetricGroupFactory + val processedTasks = gauge("processed-tasks", DifferentialValueCollector(() ⇒ { + tpe.getTaskCount + })) } -case object DispatcherMetricGroupFactory extends MetricGroupFactory { +object ThreadPoolExecutorDispatcherMetrics { - import kamon.akka.DispatcherMetrics._ - - type GroupRecorder = DispatcherMetricRecorder - - def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = { - val settings = config.getConfig("precision.dispatcher") - - val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size") - val runningThreadCountConfig = settings.getConfig("running-thread-count") - val queueTaskCountConfig = settings.getConfig("queued-task-count") - val poolSizeConfig = settings.getConfig("pool-size") - - new DispatcherMetricRecorder( - Histogram.fromConfig(maximumPoolSizeConfig), - Histogram.fromConfig(runningThreadCountConfig), - Histogram.fromConfig(queueTaskCountConfig), - Histogram.fromConfig(poolSizeConfig)) + def factory(tpe: ThreadPoolExecutor) = new EntityRecorderFactory[ThreadPoolExecutorDispatcherMetrics] { + def category: String = AkkaDispatcherMetrics.Category + def createRecorder(instrumentFactory: InstrumentFactory) = new ThreadPoolExecutorDispatcherMetrics(tpe, instrumentFactory) } +} +object AkkaDispatcherMetrics { + val Category = "akka-dispatcher" } diff --git a/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala index 2eedf764..5c5bb05a 100644 --- a/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala +++ b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala @@ -15,75 +15,26 @@ */ package kamon.akka -import akka.actor.ActorSystem -import com.typesafe.config.Config import kamon.metric._ -import kamon.metric.instrument.{ Counter, Histogram } +import kamon.metric.instrument.{ Time, InstrumentFactory } -case class RouterMetrics(name: String) extends MetricGroupIdentity { - val category = RouterMetrics -} - -object RouterMetrics extends MetricGroupCategory { - val name = "router" - - case object RoutingTime extends MetricIdentity { val name = "routing-time" } - case object ProcessingTime extends MetricIdentity { val name = "processing-time" } - case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class RouterMetricsRecorder(routingTime: Histogram, processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder { - - def collect(context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot(routingTime.collect(context), processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context)) - - def cleanup: Unit = { - routingTime.cleanup - processingTime.cleanup - timeInMailbox.cleanup - errors.cleanup - } - } - - case class RouterMetricSnapshot(routingTime: Histogram.Snapshot, processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = RouterMetricSnapshot - - def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot( - routingTime.merge(that.routingTime, context), - processingTime.merge(that.processingTime, context), - timeInMailbox.merge(that.timeInMailbox, context), - errors.merge(that.errors, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - RoutingTime -> routingTime, - ProcessingTime -> processingTime, - TimeInMailbox -> timeInMailbox, - Errors -> errors) - } - - val Factory = RouterMetricGroupFactory -} - -case object RouterMetricGroupFactory extends MetricGroupFactory { - - import kamon.akka.RouterMetrics._ - - type GroupRecorder = RouterMetricsRecorder - - def create(config: Config, system: ActorSystem): RouterMetricsRecorder = { - val settings = config.getConfig("precision.router") - - val routingTimeConfig = settings.getConfig("routing-time") - val processingTimeConfig = settings.getConfig("processing-time") - val timeInMailboxConfig = settings.getConfig("time-in-mailbox") - - new RouterMetricsRecorder( - Histogram.fromConfig(routingTimeConfig), - Histogram.fromConfig(processingTimeConfig), - Histogram.fromConfig(timeInMailboxConfig), - Counter()) - } +/** + * Entity recorder for Akka Routers. The metrics being tracked are: + * + * - routing-time: Time taken for the router to process the routing logic. + * - time-in-mailbox: Time spent from the instant when a message is enqueued in a actor's mailbox to the instant when + * that message is dequeued for processing. + * - processing-time: Time taken for the actor to process the receive function. + * - errors: Number or errors seen by the actor's supervision mechanism. + */ +class RouterMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val routingTime = histogram("routing-time", Time.Nanoseconds) + val timeInMailbox = histogram("time-in-mailbox", Time.Nanoseconds) + val processingTime = histogram("processing-time", Time.Nanoseconds) + val errors = counter("errors") } +object RouterMetrics extends EntityRecorderFactory[RouterMetrics] { + def category: String = "akka-router" + def createRecorder(instrumentFactory: InstrumentFactory): RouterMetrics = new RouterMetrics(instrumentFactory) +}
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala index 78d88583..c961737d 100644 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala @@ -19,11 +19,8 @@ package akka.kamon.instrumentation import akka.actor._ import akka.dispatch.{ Envelope, MessageDispatcher } import akka.routing.RoutedActorCell -import kamon.Kamon import kamon.akka.{ RouterMetrics, ActorMetrics } -import ActorMetrics.ActorMetricsRecorder -import RouterMetrics.RouterMetricsRecorder -import kamon.metric.Metrics +import kamon.metric.{ Metrics, Entity } import kamon.trace._ import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ @@ -36,12 +33,13 @@ class ActorCellInstrumentation { @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - val metricsExtension = Kamon(Metrics)(system) - val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) - val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + Metrics.get(system).register(ActorMetrics, ref.path.elements.mkString("/")).map { registration ⇒ + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + + cellMetrics.entity = registration.entity + cellMetrics.recorder = Some(registration.recorder) + } - cellMetrics.actorMetricIdentity = metricIdentity - cellMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) } @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(envelope)") @@ -54,11 +52,11 @@ class ActorCellInstrumentation { val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] try { - TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { + TraceContext.withContext(contextAndTimestamp.traceContext) { pjp.proceed() } } finally { - cellMetrics.actorMetricsRecorder.map { am ⇒ + cellMetrics.recorder.map { am ⇒ val processingTime = System.nanoTime() - timestampBeforeProcessing val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime @@ -81,7 +79,7 @@ class ActorCellInstrumentation { @After("sendMessageInActorCell(cell, envelope)") def afterSendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = { val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - cellMetrics.actorMetricsRecorder.map(_.mailboxSize.increment()) + cellMetrics.recorder.map(_.mailboxSize.increment()) } @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") @@ -90,15 +88,15 @@ class ActorCellInstrumentation { @After("actorStop(cell)") def afterStop(cell: ActorCell): Unit = { val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - cellMetrics.actorMetricsRecorder.map { _ ⇒ - Kamon(Metrics)(cell.system).unregister(cellMetrics.actorMetricIdentity) + cellMetrics.recorder.map { _ ⇒ + Metrics.get(cell.system).unregister(cellMetrics.entity) } // The Stop can't be captured from the RoutedActorCell so we need to put this piece of cleanup here. if (cell.isInstanceOf[RoutedActorCell]) { val routedCellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] - routedCellMetrics.routerMetricsRecorder.map { _ ⇒ - Kamon(Metrics)(cell.system).unregister(routedCellMetrics.routerMetricIdentity) + routedCellMetrics.routerRecorder.map { _ ⇒ + Metrics.get(cell.system).unregister(routedCellMetrics.routerEntity) } } } @@ -109,7 +107,7 @@ class ActorCellInstrumentation { @Before("actorInvokeFailure(cell)") def beforeInvokeFailure(cell: ActorCell): Unit = { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map(_.errors.increment()) + cellWithMetrics.recorder.map(_.errors.increment()) // In case that this actor is behind a router, count the errors for the router as well. val envelope = cell.currentMessage.asInstanceOf[RouterAwareEnvelope] @@ -125,12 +123,12 @@ class RoutedActorCellInstrumentation { @After("routedActorCellCreation(cell, system, ref, props, dispatcher, routeeProps, supervisor)") def afterRoutedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = { - val metricsExtension = Kamon(Metrics)(system) - val metricIdentity = RouterMetrics(ref.path.elements.mkString("/")) - val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] + Metrics.get(system).register(RouterMetrics, ref.path.elements.mkString("/")).map { registration ⇒ + val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] - cellMetrics.routerMetricIdentity = metricIdentity - cellMetrics.routerMetricsRecorder = metricsExtension.register(metricIdentity, RouterMetrics.Factory) + cellMetrics.routerEntity = registration.entity + cellMetrics.routerRecorder = Some(registration.recorder) + } } @Pointcut("execution(* akka.routing.RoutedActorCell.sendMessage(*)) && this(cell) && args(envelope)") @@ -143,15 +141,15 @@ class RoutedActorCellInstrumentation { val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] try { - TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { + TraceContext.withContext(contextAndTimestamp.traceContext) { // The router metrics recorder will only be picked up if the message is sent from a tracked router. - RouterAwareEnvelope.dynamicRouterMetricsRecorder.withValue(cellMetrics.routerMetricsRecorder) { + RouterAwareEnvelope.dynamicRouterMetricsRecorder.withValue(cellMetrics.routerRecorder) { pjp.proceed() } } } finally { - cellMetrics.routerMetricsRecorder map { routerRecorder ⇒ + cellMetrics.routerRecorder map { routerRecorder ⇒ routerRecorder.routingTime.record(System.nanoTime() - timestampBeforeProcessing) } } @@ -159,25 +157,25 @@ class RoutedActorCellInstrumentation { } trait ActorCellMetrics { - var actorMetricIdentity: ActorMetrics = _ - var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ + var entity: Entity = _ + var recorder: Option[ActorMetrics] = None } trait RoutedActorCellMetrics { - var routerMetricIdentity: RouterMetrics = _ - var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ + var routerEntity: Entity = _ + var routerRecorder: Option[RouterMetrics] = None } trait RouterAwareEnvelope { - def routerMetricsRecorder: Option[RouterMetricsRecorder] + def routerMetricsRecorder: Option[RouterMetrics] } object RouterAwareEnvelope { import scala.util.DynamicVariable - private[kamon] val dynamicRouterMetricsRecorder = new DynamicVariable[Option[RouterMetricsRecorder]](None) + private[kamon] val dynamicRouterMetricsRecorder = new DynamicVariable[Option[RouterMetrics]](None) def default: RouterAwareEnvelope = new RouterAwareEnvelope { - val routerMetricsRecorder: Option[RouterMetricsRecorder] = dynamicRouterMetricsRecorder.value + val routerMetricsRecorder: Option[RouterMetrics] = dynamicRouterMetricsRecorder.value } } diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorLoggingInstrumentation.scala index e0e5d316..dd998c6b 100644 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorLoggingInstrumentation.scala @@ -17,7 +17,7 @@ package akka.kamon.instrumentation import kamon.trace.logging.MdcKeysSupport -import kamon.trace.{ TraceContextAware, TraceRecorder } +import kamon.trace.{ TraceContext, TraceContextAware } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ @@ -41,7 +41,7 @@ class ActorLoggingInstrumentation extends MdcKeysSupport { @Around("withMdcInvocation(logSource, logEvent, logStatement)") def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { - TraceRecorder.withInlineTraceContextReplacement(logEvent.traceContext) { + TraceContext.withContext(logEvent.traceContext) { withMdc { pjp.proceed() } diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentation.scala index 48016876..0cb4ef13 100644 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentation.scala @@ -17,7 +17,7 @@ package akka.kamon.instrumentation import akka.dispatch.sysmsg.EarliestFirstSystemMessageList -import kamon.trace.{ TraceContextAware, TraceRecorder } +import kamon.trace.{ TraceContext, TraceContextAware } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ @@ -31,7 +31,7 @@ class ActorSystemMessageInstrumentation { def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { if (messages.nonEmpty) { val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext - TraceRecorder.withInlineTraceContextReplacement(ctx)(pjp.proceed()) + TraceContext.withContext(ctx)(pjp.proceed()) } else pjp.proceed() } @@ -73,7 +73,7 @@ class TraceContextIntoRepointableActorRefMixin { @Around("repointableActorRefCreation(repointableActorRef)") def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { - TraceRecorder.withInlineTraceContextReplacement(repointableActorRef.traceContext) { + TraceContext.withContext(repointableActorRef.traceContext) { pjp.proceed() } } diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala index ebddbfc8..28bfcae9 100644 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala @@ -16,40 +16,49 @@ package akka.kamon.instrumentation +import akka.util.Timeout import kamon.Kamon import kamon.akka.Akka -import kamon.trace.{ TraceRecorder, TraceContext, EmptyTraceContext, TraceContextAware } -import akka.actor.{ ActorSystem, ActorRef } +import kamon.trace.{ TraceContext, EmptyTraceContext, TraceContextAware } +import akka.actor.{ InternalActorRef, ActorSystem, ActorRef } import akka.event.Logging.Warning -import akka.pattern.AskTimeoutException +import akka.pattern.{ PromiseActorRef, AskTimeoutException } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ import org.aspectj.lang.reflect.SourceLocation import scala.concurrent.Future import scala.compat.Platform.EOL +import scala.concurrent.duration.FiniteDuration @Aspect class AskPatternInstrumentation { import AskPatternInstrumentation._ - @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && args(actor, *, *)") - def askableActorRefAsk(actor: ActorRef): Unit = {} + @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && args(actor, *, timeout)") + def askableActorRefAsk(actor: ActorRef, timeout: Timeout): Unit = {} - @Around("askableActorRefAsk(actor)") - def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, actor: ActorRef): AnyRef = - TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - val akkaExtension = Kamon(Akka)(system) - val future = pjp.proceed().asInstanceOf[Future[AnyRef]] + @Around("askableActorRefAsk(actor, timeout)") + def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, actor: ActorRef, timeout: Timeout): AnyRef = + TraceContext.map { ctx ⇒ + actor match { + // the AskPattern will only work for InternalActorRef's with these conditions. + case ref: InternalActorRef if !ref.isTerminated && timeout.duration.length > 0 ⇒ + val akkaExtension = ctx.lookupExtension(Akka) + val future = pjp.proceed().asInstanceOf[Future[AnyRef]] + val system = ref.provider.guardian.underlying.system - val handler = akkaExtension.askPatternTimeoutWarning match { - case "off" ⇒ None - case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))(system)) - case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))(system)) - } + val handler = akkaExtension.askPatternTimeoutWarning match { + case "off" ⇒ None + case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))(system)) + case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))(system)) + } + + handler.map(future.onFailure(_)(akkaExtension.dispatcher)) + future - handler.map(future.onFailure(_)(akkaExtension.dispatcher)) - future + case _ ⇒ pjp.proceed() // + } } getOrElse (pjp.proceed()) diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala new file mode 100644 index 00000000..f4bc31c4 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala @@ -0,0 +1,168 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor } + +import akka.actor.{ ActorSystem, ActorSystemImpl } +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool +import akka.dispatch._ +import akka.kamon.instrumentation.LookupDataAware.LookupData +import kamon.akka.{ AkkaDispatcherMetrics, ThreadPoolExecutorDispatcherMetrics, ForkJoinPoolDispatcherMetrics } +import kamon.metric.{ Metrics, Entity } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class DispatcherInstrumentation { + + @Pointcut("execution(* akka.actor.ActorSystemImpl.start(..)) && this(system)") + def actorSystemInitialization(system: ActorSystemImpl): Unit = {} + + @Before("actorSystemInitialization(system)") + def afterActorSystemInitialization(system: ActorSystemImpl): Unit = { + system.dispatchers.asInstanceOf[ActorSystemAware].actorSystem = system + + // The default dispatcher for the actor system is looked up in the ActorSystemImpl's initialization code and we + // can't get the Metrics extension there since the ActorSystem is not yet fully constructed. To workaround that + // we are manually selecting and registering the default dispatcher with the Metrics extension. All other dispatchers + // will by registered by the instrumentation bellow. + + // Yes, reflection sucks, but this piece of code is only executed once on ActorSystem's startup. + val defaultDispatcher = system.dispatcher + val executorServiceDelegateField = defaultDispatcher.getClass.getDeclaredField("executorServiceDelegate") + executorServiceDelegateField.setAccessible(true) + + val lazyExecutorServiceDelegate = executorServiceDelegateField.get(defaultDispatcher) + val executorField = lazyExecutorServiceDelegate.getClass.getMethod("executor") + executorField.setAccessible(true) + + val defaultDispatcherExecutor = executorField.invoke(lazyExecutorServiceDelegate).asInstanceOf[ExecutorService] + registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system) + } + + private def registerDispatcher(dispatcherName: String, executorService: ExecutorService, system: ActorSystem): Unit = + executorService match { + case fjp: AkkaForkJoinPool ⇒ + Metrics.get(system).register(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherName) + + case tpe: ThreadPoolExecutor ⇒ + Metrics.get(system).register(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherName) + + case others ⇒ // Currently not interested in other kinds of dispatchers. + } + + @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers) && args(dispatcherName)") + def dispatchersLookup(dispatchers: ActorSystemAware, dispatcherName: String) = {} + + @Around("dispatchersLookup(dispatchers, dispatcherName)") + def aroundDispatchersLookup(pjp: ProceedingJoinPoint, dispatchers: ActorSystemAware, dispatcherName: String): Any = + LookupDataAware.withLookupData(LookupData(dispatcherName, dispatchers.actorSystem)) { + pjp.proceed() + } + + @Pointcut("initialization(akka.dispatch.ExecutorServiceFactory.new(..)) && target(factory)") + def executorServiceFactoryInitialization(factory: LookupDataAware): Unit = {} + + @After("executorServiceFactoryInitialization(factory)") + def afterExecutorServiceFactoryInitialization(factory: LookupDataAware): Unit = + factory.lookupData = LookupDataAware.currentLookupData + + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactory+.createExecutorService()) && this(factory) && !cflow(execution(* akka.dispatch.Dispatcher.shutdown()))") + def createExecutorService(factory: LookupDataAware): Unit = {} + + @AfterReturning(pointcut = "createExecutorService(factory)", returning = "executorService") + def afterCreateExecutorService(factory: LookupDataAware, executorService: ExecutorService): Unit = { + val lookupData = factory.lookupData + + // lookupData.actorSystem will be null only during the first lookup of the default dispatcher during the + // ActorSystemImpl's initialization. + if (lookupData.actorSystem != null) + registerDispatcher(lookupData.dispatcherName, executorService, lookupData.actorSystem) + } + + @Pointcut("initialization(akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.new(..)) && this(lazyExecutor)") + def lazyExecutorInitialization(lazyExecutor: LookupDataAware): Unit = {} + + @After("lazyExecutorInitialization(lazyExecutor)") + def afterLazyExecutorInitialization(lazyExecutor: LookupDataAware): Unit = + lazyExecutor.lookupData = LookupDataAware.currentLookupData + + @Pointcut("execution(* akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.copy()) && this(lazyExecutor)") + def lazyExecutorCopy(lazyExecutor: LookupDataAware): Unit = {} + + @Around("lazyExecutorCopy(lazyExecutor)") + def aroundLazyExecutorCopy(pjp: ProceedingJoinPoint, lazyExecutor: LookupDataAware): Any = + LookupDataAware.withLookupData(lazyExecutor.lookupData) { + pjp.proceed() + } + + @Pointcut("execution(* akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.shutdown()) && this(lazyExecutor)") + def lazyExecutorShutdown(lazyExecutor: LookupDataAware): Unit = {} + + @After("lazyExecutorShutdown(lazyExecutor)") + def afterLazyExecutorShutdown(lazyExecutor: LookupDataAware): Unit = { + import lazyExecutor.lookupData + + if (lookupData.actorSystem != null) + Metrics.get(lookupData.actorSystem).unregister(Entity(lookupData.dispatcherName, AkkaDispatcherMetrics.Category)) + } + +} + +@Aspect +class DispatcherMetricCollectionInfoIntoDispatcherMixin { + + @DeclareMixin("akka.dispatch.Dispatchers") + def mixinActorSystemAwareToDispatchers: ActorSystemAware = ActorSystemAware() + + @DeclareMixin("akka.dispatch.Dispatcher.LazyExecutorServiceDelegate") + def mixinLookupDataAwareToExecutors: LookupDataAware = LookupDataAware() + + @DeclareMixin("akka.dispatch.ExecutorServiceFactory+") + def mixinActorSystemAwareToDispatcher: LookupDataAware = LookupDataAware() +} + +trait ActorSystemAware { + @volatile var actorSystem: ActorSystem = _ +} + +object ActorSystemAware { + def apply(): ActorSystemAware = new ActorSystemAware {} +} + +trait LookupDataAware { + @volatile var lookupData: LookupData = _ +} + +object LookupDataAware { + case class LookupData(dispatcherName: String, actorSystem: ActorSystem) + + private val _currentDispatcherLookupData = new ThreadLocal[LookupData] + + def apply() = new LookupDataAware {} + + def currentLookupData: LookupData = _currentDispatcherLookupData.get() + + def withLookupData[T](lookupData: LookupData)(thunk: ⇒ T): T = { + _currentDispatcherLookupData.set(lookupData) + val result = thunk + _currentDispatcherLookupData.remove() + + result + } +}
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala deleted file mode 100644 index 8280edca..00000000 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala +++ /dev/null @@ -1,164 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import java.lang.reflect.Method -import java.util.concurrent.ThreadPoolExecutor - -import akka.actor.{ ActorSystemImpl, Cancellable } -import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher } -import akka.kamon.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement -import kamon.Kamon -import kamon.akka.DispatcherMetrics -import DispatcherMetrics.DispatcherMetricRecorder -import kamon.metric.Metrics -import org.aspectj.lang.annotation._ - -import scala.concurrent.forkjoin.ForkJoinPool - -@Aspect -class DispatcherInstrumentation { - - @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") - def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} - - @Before("onActorSystemStartup(dispatchers, system)") - def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = { - val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem] - currentDispatchers.actorSystem = system - } - - @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)") - def onDispatchersLookup(dispatchers: Dispatchers) = {} - - @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") - def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { - val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - - dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem - } - - @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))") - def onCreateExecutorService(): Unit = {} - - @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))") - def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {} - - @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)") - def onDispatcherStartup(dispatcher: Dispatcher): Unit = {} - - @After("onDispatcherStartup(dispatcher)") - def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { - - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) - val metricIdentity = DispatcherMetrics(dispatcher.id) - - dispatcherWithMetrics.metricIdentity = metricIdentity - dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory) - - if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) { - dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder { - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dm ⇒ - val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = - DispatcherMetricsCollector.collect(dispatcher) - - dm.maximumPoolSize.record(maximumPoolSize) - dm.runningThreadCount.record(runningThreadCount) - dm.queueTaskCount.record(queueTaskCount) - dm.poolSize.record(poolSize) - } - } - } - } - - @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)") - def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {} - - @After("onDispatcherShutdown(dispatcher)") - def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dispatcher ⇒ - dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() - Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity) - } - } -} - -@Aspect -class DispatcherMetricCollectionInfoIntoDispatcherMixin { - - @DeclareMixin("akka.dispatch.MessageDispatcher") - def mixinDispatcherMetricsToMessageDispatcher: DispatcherMetricCollectionInfo = new DispatcherMetricCollectionInfo {} - - @DeclareMixin("akka.dispatch.Dispatchers") - def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} -} - -trait DispatcherMetricCollectionInfo { - var metricIdentity: DispatcherMetrics = _ - var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ - var dispatcherCollectorCancellable: Cancellable = _ - var actorSystem: ActorSystemImpl = _ -} - -trait DispatchersWithActorSystem { - var actorSystem: ActorSystemImpl = _ -} - -object DispatcherMetricsCollector { - - case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long) - - private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, - (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) - } - - private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) - } - - private val executorServiceMethod: Method = { - // executorService is protected - val method = classOf[Dispatcher].getDeclaredMethod("executorService") - method.setAccessible(true) - method - } - - def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = { - dispatcher match { - case x: Dispatcher ⇒ { - val executor = executorServiceMethod.invoke(x) match { - case delegate: ExecutorServiceDelegate ⇒ delegate.executor - case other ⇒ other - } - - executor match { - case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) - case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) - case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } - case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } -} |