diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-12 01:45:27 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-24 23:19:01 +0100 |
commit | 01a34f67ff75419c440f2e69c0a0db888a670a34 (patch) | |
tree | 9c4dee4e9c13c26937356950f9e4927c3f9dfb7d /kamon-akka/src | |
parent | 4a47e92d23af371f1d50b40af6cbe00a5ffc0105 (diff) | |
download | Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.gz Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.bz2 Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.zip |
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-akka/src')
19 files changed, 630 insertions, 693 deletions
diff --git a/kamon-akka/src/main/resources/reference.conf b/kamon-akka/src/main/resources/reference.conf index 4f742ee6..902a682d 100644 --- a/kamon-akka/src/main/resources/reference.conf +++ b/kamon-akka/src/main/resources/reference.conf @@ -13,39 +13,6 @@ kamon { ask-pattern-timeout-warning = off # Default dispatcher for all akka module operations - dispatcher = ${kamon.default-dispatcher} - } - - metrics.precision { - actor { - processing-time = ${kamon.metrics.precision.default-histogram-precision} - time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} - mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision} - } - - router { - routing-time = ${kamon.metrics.precision.default-histogram-precision} - processing-time = ${kamon.metrics.precision.default-histogram-precision} - time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} - } - - dispatcher { - maximum-pool-size { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - running-thread-count { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - queued-task-count { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - pool-size { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - } + dispatcher = "akka.actor.default-dispatcher" } }
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala index b22f7fa9..c99df586 100644 --- a/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala +++ b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala @@ -16,79 +16,26 @@ package kamon.akka -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric._ -import kamon.metric.instrument.{ Counter, Histogram, MinMaxCounter } +import kamon.metric.{ EntityRecorderFactory, GenericEntityRecorder } +import kamon.metric.instrument.{ Time, InstrumentFactory } -case class ActorMetrics(name: String) extends MetricGroupIdentity { - val category = ActorMetrics -} - -object ActorMetrics extends MetricGroupCategory { - val name = "actor" - - case object ProcessingTime extends MetricIdentity { val name = "processing-time" } - case object MailboxSize extends MetricIdentity { val name = "mailbox-size" } - case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter, - errors: Counter) extends MetricGroupRecorder { - - def collect(context: CollectionContext): ActorMetricSnapshot = - ActorMetricSnapshot( - processingTime.collect(context), - timeInMailbox.collect(context), - mailboxSize.collect(context), - errors.collect(context)) - - def cleanup: Unit = { - processingTime.cleanup - mailboxSize.cleanup - timeInMailbox.cleanup - errors.cleanup - } - } - - case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, - mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = ActorMetricSnapshot - - def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot = - ActorMetricSnapshot( - processingTime.merge(that.processingTime, context), - timeInMailbox.merge(that.timeInMailbox, context), - mailboxSize.merge(that.mailboxSize, context), - errors.merge(that.errors, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - (ProcessingTime -> processingTime), - (MailboxSize -> mailboxSize), - (TimeInMailbox -> timeInMailbox), - (Errors -> errors)) - } - - val Factory = ActorMetricGroupFactory +/** + * Entity recorder for Akka Actors. The metrics being tracked are: + * + * - time-in-mailbox: Time spent from the instant when a message is enqueued in a actor's mailbox to the instant when + * that message is dequeued for processing. + * - processing-time: Time taken for the actor to process the receive function. + * - mailbox-size: Size of the actor's mailbox. + * - errors: Number or errors seen by the actor's supervision mechanism. + */ +class ActorMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val timeInMailbox = histogram("time-in-mailbox", Time.Nanoseconds) + val processingTime = histogram("processing-time", Time.Nanoseconds) + val mailboxSize = minMaxCounter("mailbox-size") + val errors = counter("errors") } -case object ActorMetricGroupFactory extends MetricGroupFactory { - import kamon.akka.ActorMetrics._ - - type GroupRecorder = ActorMetricsRecorder - - def create(config: Config, system: ActorSystem): ActorMetricsRecorder = { - val settings = config.getConfig("precision.actor") - - val processingTimeConfig = settings.getConfig("processing-time") - val timeInMailboxConfig = settings.getConfig("time-in-mailbox") - val mailboxSizeConfig = settings.getConfig("mailbox-size") - - new ActorMetricsRecorder( - Histogram.fromConfig(processingTimeConfig), - Histogram.fromConfig(timeInMailboxConfig), - MinMaxCounter.fromConfig(mailboxSizeConfig, system), - Counter()) - } -} +object ActorMetrics extends EntityRecorderFactory[ActorMetrics] { + def category: String = "akka-actor" + def createRecorder(instrumentFactory: InstrumentFactory): ActorMetrics = new ActorMetrics(instrumentFactory) +}
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala index bc013b63..cbca7db6 100644 --- a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala +++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala @@ -16,8 +16,8 @@ package kamon.akka -import akka.actor -import akka.actor._ +import _root_.akka.actor +import _root_.akka.actor._ import kamon._ class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { @@ -29,4 +29,6 @@ class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { object Akka extends ExtensionId[AkkaExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = Akka def createExtension(system: ExtendedActorSystem): AkkaExtension = new AkkaExtension(system) -}
\ No newline at end of file + +} + diff --git a/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala index 64e16f96..acf92e70 100644 --- a/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala +++ b/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala @@ -16,79 +16,71 @@ package kamon.akka -import akka.actor.ActorSystem -import com.typesafe.config.Config +import java.util.concurrent.ThreadPoolExecutor + +import _root_.akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool import kamon.metric._ -import kamon.metric.instrument.Histogram +import kamon.metric.instrument.{ DifferentialValueCollector, InstrumentFactory } -case class DispatcherMetrics(name: String) extends MetricGroupIdentity { - val category = DispatcherMetrics -} +class ForkJoinPoolDispatcherMetrics(fjp: AkkaForkJoinPool, instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val paralellism = minMaxCounter("parallelism") + paralellism.increment(fjp.getParallelism) // Steady value. -object DispatcherMetrics extends MetricGroupCategory { - val name = "dispatcher" + val poolSize = gauge("pool-size", () ⇒ { + fjp.getPoolSize.toLong + }) - case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" } - case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" } - case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" } - case object PoolSize extends MetricIdentity { val name = "pool-size" } + val activeThreads = gauge("active-threads", () ⇒ { + fjp.getActiveThreadCount.toLong + }) - case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram, - queueTaskCount: Histogram, poolSize: Histogram) - extends MetricGroupRecorder { + val runningThreads = gauge("running-threads", () ⇒ { + fjp.getRunningThreadCount.toLong + }) - def collect(context: CollectionContext): MetricGroupSnapshot = - DispatcherMetricSnapshot( - maximumPoolSize.collect(context), - runningThreadCount.collect(context), - queueTaskCount.collect(context), - poolSize.collect(context)) + val queuedTaskCount = gauge("queued-task-count", () ⇒ { + fjp.getQueuedTaskCount + }) +} - def cleanup: Unit = {} +object ForkJoinPoolDispatcherMetrics { + def factory(fjp: AkkaForkJoinPool) = new EntityRecorderFactory[ForkJoinPoolDispatcherMetrics] { + def category: String = AkkaDispatcherMetrics.Category + def createRecorder(instrumentFactory: InstrumentFactory) = new ForkJoinPoolDispatcherMetrics(fjp, instrumentFactory) } +} - case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot, - queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot { +class ThreadPoolExecutorDispatcherMetrics(tpe: ThreadPoolExecutor, instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val corePoolSize = gauge("core-pool-size", () ⇒ { + tpe.getCorePoolSize.toLong + }) - type GroupSnapshotType = DispatcherMetricSnapshot + val maxPoolSize = gauge("max-pool-size", () ⇒ { + tpe.getMaximumPoolSize.toLong + }) - def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot = - DispatcherMetricSnapshot( - maximumPoolSize.merge(that.maximumPoolSize, context), - runningThreadCount.merge(that.runningThreadCount, context), - queueTaskCount.merge(that.queueTaskCount, context), - poolSize.merge(that.poolSize, context)) + val poolSize = gauge("pool-size", () ⇒ { + tpe.getPoolSize.toLong + }) - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - (MaximumPoolSize -> maximumPoolSize), - (RunningThreadCount -> runningThreadCount), - (QueueTaskCount -> queueTaskCount), - (PoolSize -> poolSize)) - } + val activeThreads = gauge("active-threads", () ⇒ { + tpe.getActiveCount.toLong + }) - val Factory = DispatcherMetricGroupFactory + val processedTasks = gauge("processed-tasks", DifferentialValueCollector(() ⇒ { + tpe.getTaskCount + })) } -case object DispatcherMetricGroupFactory extends MetricGroupFactory { +object ThreadPoolExecutorDispatcherMetrics { - import kamon.akka.DispatcherMetrics._ - - type GroupRecorder = DispatcherMetricRecorder - - def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = { - val settings = config.getConfig("precision.dispatcher") - - val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size") - val runningThreadCountConfig = settings.getConfig("running-thread-count") - val queueTaskCountConfig = settings.getConfig("queued-task-count") - val poolSizeConfig = settings.getConfig("pool-size") - - new DispatcherMetricRecorder( - Histogram.fromConfig(maximumPoolSizeConfig), - Histogram.fromConfig(runningThreadCountConfig), - Histogram.fromConfig(queueTaskCountConfig), - Histogram.fromConfig(poolSizeConfig)) + def factory(tpe: ThreadPoolExecutor) = new EntityRecorderFactory[ThreadPoolExecutorDispatcherMetrics] { + def category: String = AkkaDispatcherMetrics.Category + def createRecorder(instrumentFactory: InstrumentFactory) = new ThreadPoolExecutorDispatcherMetrics(tpe, instrumentFactory) } +} +object AkkaDispatcherMetrics { + val Category = "akka-dispatcher" } diff --git a/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala index 2eedf764..5c5bb05a 100644 --- a/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala +++ b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala @@ -15,75 +15,26 @@ */ package kamon.akka -import akka.actor.ActorSystem -import com.typesafe.config.Config import kamon.metric._ -import kamon.metric.instrument.{ Counter, Histogram } +import kamon.metric.instrument.{ Time, InstrumentFactory } -case class RouterMetrics(name: String) extends MetricGroupIdentity { - val category = RouterMetrics -} - -object RouterMetrics extends MetricGroupCategory { - val name = "router" - - case object RoutingTime extends MetricIdentity { val name = "routing-time" } - case object ProcessingTime extends MetricIdentity { val name = "processing-time" } - case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class RouterMetricsRecorder(routingTime: Histogram, processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder { - - def collect(context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot(routingTime.collect(context), processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context)) - - def cleanup: Unit = { - routingTime.cleanup - processingTime.cleanup - timeInMailbox.cleanup - errors.cleanup - } - } - - case class RouterMetricSnapshot(routingTime: Histogram.Snapshot, processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = RouterMetricSnapshot - - def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot( - routingTime.merge(that.routingTime, context), - processingTime.merge(that.processingTime, context), - timeInMailbox.merge(that.timeInMailbox, context), - errors.merge(that.errors, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - RoutingTime -> routingTime, - ProcessingTime -> processingTime, - TimeInMailbox -> timeInMailbox, - Errors -> errors) - } - - val Factory = RouterMetricGroupFactory -} - -case object RouterMetricGroupFactory extends MetricGroupFactory { - - import kamon.akka.RouterMetrics._ - - type GroupRecorder = RouterMetricsRecorder - - def create(config: Config, system: ActorSystem): RouterMetricsRecorder = { - val settings = config.getConfig("precision.router") - - val routingTimeConfig = settings.getConfig("routing-time") - val processingTimeConfig = settings.getConfig("processing-time") - val timeInMailboxConfig = settings.getConfig("time-in-mailbox") - - new RouterMetricsRecorder( - Histogram.fromConfig(routingTimeConfig), - Histogram.fromConfig(processingTimeConfig), - Histogram.fromConfig(timeInMailboxConfig), - Counter()) - } +/** + * Entity recorder for Akka Routers. The metrics being tracked are: + * + * - routing-time: Time taken for the router to process the routing logic. + * - time-in-mailbox: Time spent from the instant when a message is enqueued in a actor's mailbox to the instant when + * that message is dequeued for processing. + * - processing-time: Time taken for the actor to process the receive function. + * - errors: Number or errors seen by the actor's supervision mechanism. + */ +class RouterMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val routingTime = histogram("routing-time", Time.Nanoseconds) + val timeInMailbox = histogram("time-in-mailbox", Time.Nanoseconds) + val processingTime = histogram("processing-time", Time.Nanoseconds) + val errors = counter("errors") } +object RouterMetrics extends EntityRecorderFactory[RouterMetrics] { + def category: String = "akka-router" + def createRecorder(instrumentFactory: InstrumentFactory): RouterMetrics = new RouterMetrics(instrumentFactory) +}
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala index 78d88583..c961737d 100644 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala @@ -19,11 +19,8 @@ package akka.kamon.instrumentation import akka.actor._ import akka.dispatch.{ Envelope, MessageDispatcher } import akka.routing.RoutedActorCell -import kamon.Kamon import kamon.akka.{ RouterMetrics, ActorMetrics } -import ActorMetrics.ActorMetricsRecorder -import RouterMetrics.RouterMetricsRecorder -import kamon.metric.Metrics +import kamon.metric.{ Metrics, Entity } import kamon.trace._ import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ @@ -36,12 +33,13 @@ class ActorCellInstrumentation { @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - val metricsExtension = Kamon(Metrics)(system) - val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) - val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + Metrics.get(system).register(ActorMetrics, ref.path.elements.mkString("/")).map { registration ⇒ + val cellMetrics = cell.asInstanceOf[ActorCellMetrics] + + cellMetrics.entity = registration.entity + cellMetrics.recorder = Some(registration.recorder) + } - cellMetrics.actorMetricIdentity = metricIdentity - cellMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) } @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(envelope)") @@ -54,11 +52,11 @@ class ActorCellInstrumentation { val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] try { - TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { + TraceContext.withContext(contextAndTimestamp.traceContext) { pjp.proceed() } } finally { - cellMetrics.actorMetricsRecorder.map { am ⇒ + cellMetrics.recorder.map { am ⇒ val processingTime = System.nanoTime() - timestampBeforeProcessing val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime @@ -81,7 +79,7 @@ class ActorCellInstrumentation { @After("sendMessageInActorCell(cell, envelope)") def afterSendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = { val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - cellMetrics.actorMetricsRecorder.map(_.mailboxSize.increment()) + cellMetrics.recorder.map(_.mailboxSize.increment()) } @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") @@ -90,15 +88,15 @@ class ActorCellInstrumentation { @After("actorStop(cell)") def afterStop(cell: ActorCell): Unit = { val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - cellMetrics.actorMetricsRecorder.map { _ ⇒ - Kamon(Metrics)(cell.system).unregister(cellMetrics.actorMetricIdentity) + cellMetrics.recorder.map { _ ⇒ + Metrics.get(cell.system).unregister(cellMetrics.entity) } // The Stop can't be captured from the RoutedActorCell so we need to put this piece of cleanup here. if (cell.isInstanceOf[RoutedActorCell]) { val routedCellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] - routedCellMetrics.routerMetricsRecorder.map { _ ⇒ - Kamon(Metrics)(cell.system).unregister(routedCellMetrics.routerMetricIdentity) + routedCellMetrics.routerRecorder.map { _ ⇒ + Metrics.get(cell.system).unregister(routedCellMetrics.routerEntity) } } } @@ -109,7 +107,7 @@ class ActorCellInstrumentation { @Before("actorInvokeFailure(cell)") def beforeInvokeFailure(cell: ActorCell): Unit = { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map(_.errors.increment()) + cellWithMetrics.recorder.map(_.errors.increment()) // In case that this actor is behind a router, count the errors for the router as well. val envelope = cell.currentMessage.asInstanceOf[RouterAwareEnvelope] @@ -125,12 +123,12 @@ class RoutedActorCellInstrumentation { @After("routedActorCellCreation(cell, system, ref, props, dispatcher, routeeProps, supervisor)") def afterRoutedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = { - val metricsExtension = Kamon(Metrics)(system) - val metricIdentity = RouterMetrics(ref.path.elements.mkString("/")) - val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] + Metrics.get(system).register(RouterMetrics, ref.path.elements.mkString("/")).map { registration ⇒ + val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] - cellMetrics.routerMetricIdentity = metricIdentity - cellMetrics.routerMetricsRecorder = metricsExtension.register(metricIdentity, RouterMetrics.Factory) + cellMetrics.routerEntity = registration.entity + cellMetrics.routerRecorder = Some(registration.recorder) + } } @Pointcut("execution(* akka.routing.RoutedActorCell.sendMessage(*)) && this(cell) && args(envelope)") @@ -143,15 +141,15 @@ class RoutedActorCellInstrumentation { val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] try { - TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { + TraceContext.withContext(contextAndTimestamp.traceContext) { // The router metrics recorder will only be picked up if the message is sent from a tracked router. - RouterAwareEnvelope.dynamicRouterMetricsRecorder.withValue(cellMetrics.routerMetricsRecorder) { + RouterAwareEnvelope.dynamicRouterMetricsRecorder.withValue(cellMetrics.routerRecorder) { pjp.proceed() } } } finally { - cellMetrics.routerMetricsRecorder map { routerRecorder ⇒ + cellMetrics.routerRecorder map { routerRecorder ⇒ routerRecorder.routingTime.record(System.nanoTime() - timestampBeforeProcessing) } } @@ -159,25 +157,25 @@ class RoutedActorCellInstrumentation { } trait ActorCellMetrics { - var actorMetricIdentity: ActorMetrics = _ - var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ + var entity: Entity = _ + var recorder: Option[ActorMetrics] = None } trait RoutedActorCellMetrics { - var routerMetricIdentity: RouterMetrics = _ - var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ + var routerEntity: Entity = _ + var routerRecorder: Option[RouterMetrics] = None } trait RouterAwareEnvelope { - def routerMetricsRecorder: Option[RouterMetricsRecorder] + def routerMetricsRecorder: Option[RouterMetrics] } object RouterAwareEnvelope { import scala.util.DynamicVariable - private[kamon] val dynamicRouterMetricsRecorder = new DynamicVariable[Option[RouterMetricsRecorder]](None) + private[kamon] val dynamicRouterMetricsRecorder = new DynamicVariable[Option[RouterMetrics]](None) def default: RouterAwareEnvelope = new RouterAwareEnvelope { - val routerMetricsRecorder: Option[RouterMetricsRecorder] = dynamicRouterMetricsRecorder.value + val routerMetricsRecorder: Option[RouterMetrics] = dynamicRouterMetricsRecorder.value } } diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorLoggingInstrumentation.scala index e0e5d316..dd998c6b 100644 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorLoggingInstrumentation.scala @@ -17,7 +17,7 @@ package akka.kamon.instrumentation import kamon.trace.logging.MdcKeysSupport -import kamon.trace.{ TraceContextAware, TraceRecorder } +import kamon.trace.{ TraceContext, TraceContextAware } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ @@ -41,7 +41,7 @@ class ActorLoggingInstrumentation extends MdcKeysSupport { @Around("withMdcInvocation(logSource, logEvent, logStatement)") def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { - TraceRecorder.withInlineTraceContextReplacement(logEvent.traceContext) { + TraceContext.withContext(logEvent.traceContext) { withMdc { pjp.proceed() } diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentation.scala index 48016876..0cb4ef13 100644 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentation.scala @@ -17,7 +17,7 @@ package akka.kamon.instrumentation import akka.dispatch.sysmsg.EarliestFirstSystemMessageList -import kamon.trace.{ TraceContextAware, TraceRecorder } +import kamon.trace.{ TraceContext, TraceContextAware } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ @@ -31,7 +31,7 @@ class ActorSystemMessageInstrumentation { def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { if (messages.nonEmpty) { val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext - TraceRecorder.withInlineTraceContextReplacement(ctx)(pjp.proceed()) + TraceContext.withContext(ctx)(pjp.proceed()) } else pjp.proceed() } @@ -73,7 +73,7 @@ class TraceContextIntoRepointableActorRefMixin { @Around("repointableActorRefCreation(repointableActorRef)") def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { - TraceRecorder.withInlineTraceContextReplacement(repointableActorRef.traceContext) { + TraceContext.withContext(repointableActorRef.traceContext) { pjp.proceed() } } diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala index ebddbfc8..28bfcae9 100644 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala @@ -16,40 +16,49 @@ package akka.kamon.instrumentation +import akka.util.Timeout import kamon.Kamon import kamon.akka.Akka -import kamon.trace.{ TraceRecorder, TraceContext, EmptyTraceContext, TraceContextAware } -import akka.actor.{ ActorSystem, ActorRef } +import kamon.trace.{ TraceContext, EmptyTraceContext, TraceContextAware } +import akka.actor.{ InternalActorRef, ActorSystem, ActorRef } import akka.event.Logging.Warning -import akka.pattern.AskTimeoutException +import akka.pattern.{ PromiseActorRef, AskTimeoutException } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ import org.aspectj.lang.reflect.SourceLocation import scala.concurrent.Future import scala.compat.Platform.EOL +import scala.concurrent.duration.FiniteDuration @Aspect class AskPatternInstrumentation { import AskPatternInstrumentation._ - @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && args(actor, *, *)") - def askableActorRefAsk(actor: ActorRef): Unit = {} + @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && args(actor, *, timeout)") + def askableActorRefAsk(actor: ActorRef, timeout: Timeout): Unit = {} - @Around("askableActorRefAsk(actor)") - def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, actor: ActorRef): AnyRef = - TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - val akkaExtension = Kamon(Akka)(system) - val future = pjp.proceed().asInstanceOf[Future[AnyRef]] + @Around("askableActorRefAsk(actor, timeout)") + def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, actor: ActorRef, timeout: Timeout): AnyRef = + TraceContext.map { ctx ⇒ + actor match { + // the AskPattern will only work for InternalActorRef's with these conditions. + case ref: InternalActorRef if !ref.isTerminated && timeout.duration.length > 0 ⇒ + val akkaExtension = ctx.lookupExtension(Akka) + val future = pjp.proceed().asInstanceOf[Future[AnyRef]] + val system = ref.provider.guardian.underlying.system - val handler = akkaExtension.askPatternTimeoutWarning match { - case "off" ⇒ None - case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))(system)) - case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))(system)) - } + val handler = akkaExtension.askPatternTimeoutWarning match { + case "off" ⇒ None + case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))(system)) + case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))(system)) + } + + handler.map(future.onFailure(_)(akkaExtension.dispatcher)) + future - handler.map(future.onFailure(_)(akkaExtension.dispatcher)) - future + case _ ⇒ pjp.proceed() // + } } getOrElse (pjp.proceed()) diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala new file mode 100644 index 00000000..f4bc31c4 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala @@ -0,0 +1,168 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor } + +import akka.actor.{ ActorSystem, ActorSystemImpl } +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool +import akka.dispatch._ +import akka.kamon.instrumentation.LookupDataAware.LookupData +import kamon.akka.{ AkkaDispatcherMetrics, ThreadPoolExecutorDispatcherMetrics, ForkJoinPoolDispatcherMetrics } +import kamon.metric.{ Metrics, Entity } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class DispatcherInstrumentation { + + @Pointcut("execution(* akka.actor.ActorSystemImpl.start(..)) && this(system)") + def actorSystemInitialization(system: ActorSystemImpl): Unit = {} + + @Before("actorSystemInitialization(system)") + def afterActorSystemInitialization(system: ActorSystemImpl): Unit = { + system.dispatchers.asInstanceOf[ActorSystemAware].actorSystem = system + + // The default dispatcher for the actor system is looked up in the ActorSystemImpl's initialization code and we + // can't get the Metrics extension there since the ActorSystem is not yet fully constructed. To workaround that + // we are manually selecting and registering the default dispatcher with the Metrics extension. All other dispatchers + // will by registered by the instrumentation bellow. + + // Yes, reflection sucks, but this piece of code is only executed once on ActorSystem's startup. + val defaultDispatcher = system.dispatcher + val executorServiceDelegateField = defaultDispatcher.getClass.getDeclaredField("executorServiceDelegate") + executorServiceDelegateField.setAccessible(true) + + val lazyExecutorServiceDelegate = executorServiceDelegateField.get(defaultDispatcher) + val executorField = lazyExecutorServiceDelegate.getClass.getMethod("executor") + executorField.setAccessible(true) + + val defaultDispatcherExecutor = executorField.invoke(lazyExecutorServiceDelegate).asInstanceOf[ExecutorService] + registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system) + } + + private def registerDispatcher(dispatcherName: String, executorService: ExecutorService, system: ActorSystem): Unit = + executorService match { + case fjp: AkkaForkJoinPool ⇒ + Metrics.get(system).register(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherName) + + case tpe: ThreadPoolExecutor ⇒ + Metrics.get(system).register(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherName) + + case others ⇒ // Currently not interested in other kinds of dispatchers. + } + + @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers) && args(dispatcherName)") + def dispatchersLookup(dispatchers: ActorSystemAware, dispatcherName: String) = {} + + @Around("dispatchersLookup(dispatchers, dispatcherName)") + def aroundDispatchersLookup(pjp: ProceedingJoinPoint, dispatchers: ActorSystemAware, dispatcherName: String): Any = + LookupDataAware.withLookupData(LookupData(dispatcherName, dispatchers.actorSystem)) { + pjp.proceed() + } + + @Pointcut("initialization(akka.dispatch.ExecutorServiceFactory.new(..)) && target(factory)") + def executorServiceFactoryInitialization(factory: LookupDataAware): Unit = {} + + @After("executorServiceFactoryInitialization(factory)") + def afterExecutorServiceFactoryInitialization(factory: LookupDataAware): Unit = + factory.lookupData = LookupDataAware.currentLookupData + + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactory+.createExecutorService()) && this(factory) && !cflow(execution(* akka.dispatch.Dispatcher.shutdown()))") + def createExecutorService(factory: LookupDataAware): Unit = {} + + @AfterReturning(pointcut = "createExecutorService(factory)", returning = "executorService") + def afterCreateExecutorService(factory: LookupDataAware, executorService: ExecutorService): Unit = { + val lookupData = factory.lookupData + + // lookupData.actorSystem will be null only during the first lookup of the default dispatcher during the + // ActorSystemImpl's initialization. + if (lookupData.actorSystem != null) + registerDispatcher(lookupData.dispatcherName, executorService, lookupData.actorSystem) + } + + @Pointcut("initialization(akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.new(..)) && this(lazyExecutor)") + def lazyExecutorInitialization(lazyExecutor: LookupDataAware): Unit = {} + + @After("lazyExecutorInitialization(lazyExecutor)") + def afterLazyExecutorInitialization(lazyExecutor: LookupDataAware): Unit = + lazyExecutor.lookupData = LookupDataAware.currentLookupData + + @Pointcut("execution(* akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.copy()) && this(lazyExecutor)") + def lazyExecutorCopy(lazyExecutor: LookupDataAware): Unit = {} + + @Around("lazyExecutorCopy(lazyExecutor)") + def aroundLazyExecutorCopy(pjp: ProceedingJoinPoint, lazyExecutor: LookupDataAware): Any = + LookupDataAware.withLookupData(lazyExecutor.lookupData) { + pjp.proceed() + } + + @Pointcut("execution(* akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.shutdown()) && this(lazyExecutor)") + def lazyExecutorShutdown(lazyExecutor: LookupDataAware): Unit = {} + + @After("lazyExecutorShutdown(lazyExecutor)") + def afterLazyExecutorShutdown(lazyExecutor: LookupDataAware): Unit = { + import lazyExecutor.lookupData + + if (lookupData.actorSystem != null) + Metrics.get(lookupData.actorSystem).unregister(Entity(lookupData.dispatcherName, AkkaDispatcherMetrics.Category)) + } + +} + +@Aspect +class DispatcherMetricCollectionInfoIntoDispatcherMixin { + + @DeclareMixin("akka.dispatch.Dispatchers") + def mixinActorSystemAwareToDispatchers: ActorSystemAware = ActorSystemAware() + + @DeclareMixin("akka.dispatch.Dispatcher.LazyExecutorServiceDelegate") + def mixinLookupDataAwareToExecutors: LookupDataAware = LookupDataAware() + + @DeclareMixin("akka.dispatch.ExecutorServiceFactory+") + def mixinActorSystemAwareToDispatcher: LookupDataAware = LookupDataAware() +} + +trait ActorSystemAware { + @volatile var actorSystem: ActorSystem = _ +} + +object ActorSystemAware { + def apply(): ActorSystemAware = new ActorSystemAware {} +} + +trait LookupDataAware { + @volatile var lookupData: LookupData = _ +} + +object LookupDataAware { + case class LookupData(dispatcherName: String, actorSystem: ActorSystem) + + private val _currentDispatcherLookupData = new ThreadLocal[LookupData] + + def apply() = new LookupDataAware {} + + def currentLookupData: LookupData = _currentDispatcherLookupData.get() + + def withLookupData[T](lookupData: LookupData)(thunk: ⇒ T): T = { + _currentDispatcherLookupData.set(lookupData) + val result = thunk + _currentDispatcherLookupData.remove() + + result + } +}
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala deleted file mode 100644 index 8280edca..00000000 --- a/kamon-akka/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala +++ /dev/null @@ -1,164 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import java.lang.reflect.Method -import java.util.concurrent.ThreadPoolExecutor - -import akka.actor.{ ActorSystemImpl, Cancellable } -import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher } -import akka.kamon.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement -import kamon.Kamon -import kamon.akka.DispatcherMetrics -import DispatcherMetrics.DispatcherMetricRecorder -import kamon.metric.Metrics -import org.aspectj.lang.annotation._ - -import scala.concurrent.forkjoin.ForkJoinPool - -@Aspect -class DispatcherInstrumentation { - - @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") - def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} - - @Before("onActorSystemStartup(dispatchers, system)") - def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = { - val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem] - currentDispatchers.actorSystem = system - } - - @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)") - def onDispatchersLookup(dispatchers: Dispatchers) = {} - - @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") - def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { - val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - - dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem - } - - @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))") - def onCreateExecutorService(): Unit = {} - - @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))") - def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {} - - @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)") - def onDispatcherStartup(dispatcher: Dispatcher): Unit = {} - - @After("onDispatcherStartup(dispatcher)") - def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { - - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) - val metricIdentity = DispatcherMetrics(dispatcher.id) - - dispatcherWithMetrics.metricIdentity = metricIdentity - dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory) - - if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) { - dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder { - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dm ⇒ - val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = - DispatcherMetricsCollector.collect(dispatcher) - - dm.maximumPoolSize.record(maximumPoolSize) - dm.runningThreadCount.record(runningThreadCount) - dm.queueTaskCount.record(queueTaskCount) - dm.poolSize.record(poolSize) - } - } - } - } - - @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)") - def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {} - - @After("onDispatcherShutdown(dispatcher)") - def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dispatcher ⇒ - dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() - Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity) - } - } -} - -@Aspect -class DispatcherMetricCollectionInfoIntoDispatcherMixin { - - @DeclareMixin("akka.dispatch.MessageDispatcher") - def mixinDispatcherMetricsToMessageDispatcher: DispatcherMetricCollectionInfo = new DispatcherMetricCollectionInfo {} - - @DeclareMixin("akka.dispatch.Dispatchers") - def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} -} - -trait DispatcherMetricCollectionInfo { - var metricIdentity: DispatcherMetrics = _ - var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ - var dispatcherCollectorCancellable: Cancellable = _ - var actorSystem: ActorSystemImpl = _ -} - -trait DispatchersWithActorSystem { - var actorSystem: ActorSystemImpl = _ -} - -object DispatcherMetricsCollector { - - case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long) - - private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, - (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) - } - - private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) - } - - private val executorServiceMethod: Method = { - // executorService is protected - val method = classOf[Dispatcher].getDeclaredMethod("executorService") - method.setAccessible(true) - method - } - - def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = { - dispatcher match { - case x: Dispatcher ⇒ { - val executor = executorServiceMethod.invoke(x) match { - case delegate: ExecutorServiceDelegate ⇒ delegate.executor - case other ⇒ other - } - - executor match { - case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) - case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) - case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } - case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } -} diff --git a/kamon-akka/src/test/resources/logback.xml b/kamon-akka/src/test/resources/logback.xml deleted file mode 100644 index 10c9aa35..00000000 --- a/kamon-akka/src/test/resources/logback.xml +++ /dev/null @@ -1,14 +0,0 @@ -<configuration scan="true"> - <conversionRule conversionWord="traceToken" converterClass="kamon.trace.logging.LogbackTraceTokenConverter"/> - - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%date{HH:mm:ss.SSS} %-5level [%traceToken][%thread] %logger{55} - %msg%n</pattern> - </encoder> - </appender> - - <root level="error"> - <appender-ref ref="STDOUT" /> - </root> - -</configuration> diff --git a/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala index 06a232bd..8f7ae613 100644 --- a/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala @@ -15,35 +15,32 @@ * ========================================================== */ package kamon.instrumentation.akka -import akka.actor.{ Actor, ActorSystem, Props } +import akka.actor.{ Actor, Props } import akka.pattern.{ ask, pipe } import akka.routing._ -import akka.testkit.{ TestKitBase, ImplicitSender, TestKit } import akka.util.Timeout -import com.typesafe.config.ConfigFactory -import kamon.trace.TraceRecorder -import org.scalatest.{ BeforeAndAfterAll, WordSpecLike } +import kamon.testkit.BaseKamonSpec +import kamon.trace.TraceContext import scala.concurrent.duration._ -class ActorCellInstrumentationSpec extends TestKitBase with WordSpecLike with ImplicitSender with BeforeAndAfterAll { - implicit lazy val system: ActorSystem = ActorSystem("actor-cell-instrumentation-spec") - implicit val executionContext = system.dispatcher +class ActorCellInstrumentationSpec extends BaseKamonSpec("actor-cell-instrumentation-spec") { + implicit lazy val executionContext = system.dispatcher "the message passing instrumentation" should { "propagate the TraceContext using bang" in new EchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("bang-reply") { + val testTraceContext = TraceContext.withContext(newContext("bang-reply")) { ctxEchoActor ! "test" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) } "propagate the TraceContext using tell" in new EchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("tell-reply") { + val testTraceContext = TraceContext.withContext(newContext("tell-reply")) { ctxEchoActor.tell("test", testActor) - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) @@ -51,37 +48,37 @@ class ActorCellInstrumentationSpec extends TestKitBase with WordSpecLike with Im "propagate the TraceContext using ask" in new EchoActorFixture { implicit val timeout = Timeout(1 seconds) - val testTraceContext = TraceRecorder.withNewTraceContext("ask-reply") { + val testTraceContext = TraceContext.withContext(newContext("ask-reply")) { // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. (ctxEchoActor ? "test") pipeTo (testActor) - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) } "propagate the TraceContext to actors behind a simple router" in new EchoSimpleRouterFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { + val testTraceContext = TraceContext.withContext(newContext("router-reply")) { router.route("test", testActor) - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) } "propagate the TraceContext to actors behind a pool router" in new EchoPoolRouterFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { + val testTraceContext = TraceContext.withContext(newContext("router-reply")) { pool ! "test" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) } "propagate the TraceContext to actors behind a group router" in new EchoGroupRouterFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { + val testTraceContext = TraceContext.withContext(newContext("router-reply")) { group ! "test" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) @@ -119,7 +116,7 @@ class ActorCellInstrumentationSpec extends TestKitBase with WordSpecLike with Im class TraceContextEcho extends Actor { def receive = { - case msg: String ⇒ sender ! TraceRecorder.currentContext + case msg: String ⇒ sender ! TraceContext.currentContext } } diff --git a/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala index 598e9327..21706af9 100644 --- a/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala @@ -15,28 +15,33 @@ * ========================================================== */ package kamon.instrumentation.akka -import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } +import akka.actor.{ Actor, ActorLogging, Props } import akka.event.Logging.LogEvent -import akka.testkit.TestKitBase import com.typesafe.config.ConfigFactory +import kamon.testkit.BaseKamonSpec import kamon.trace.TraceLocal.AvailableToMdc import kamon.trace.logging.MdcKeysSupport -import kamon.trace.{ TraceContextAware, TraceLocal, TraceRecorder } -import org.scalatest.{ BeforeAndAfterAll, Inspectors, Matchers, WordSpecLike } +import kamon.trace.{ TraceContextAware, TraceLocal, TraceContext } +import org.scalatest.Inspectors import org.slf4j.MDC -class ActorLoggingInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with Inspectors with MdcKeysSupport with BeforeAndAfterAll { - implicit lazy val system: ActorSystem = ActorSystem("actor-logging-instrumentation-spec", - ConfigFactory.parseString("""akka.loggers = ["akka.event.slf4j.Slf4jLogger"]""")) +class ActorLoggingInstrumentationSpec extends BaseKamonSpec("actor-logging-instrumentation-spec") with Inspectors with MdcKeysSupport { + override lazy val config = + ConfigFactory.parseString( + """ + |akka { + | loggers = ["akka.event.slf4j.Slf4jLogger"] + |} + """.stripMargin) "the ActorLogging instrumentation" should { "attach the TraceContext (if available) to log events" in { val loggerActor = system.actorOf(Props[LoggerActor]) system.eventStream.subscribe(testActor, classOf[LogEvent]) - val testTraceContext = TraceRecorder.withNewTraceContext("logging") { + val testTraceContext = TraceContext.withContext(newContext("logging")) { loggerActor ! "info" - TraceRecorder.currentContext + TraceContext.currentContext } fishForMessage() { @@ -50,7 +55,7 @@ class ActorLoggingInstrumentationSpec extends TestKitBase with WordSpecLike with "allow retrieve a value from the MDC when was created a key of type AvailableToMdc" in { val testString = "Hello World" - TraceRecorder.withNewTraceContext("logging-with-mdc") { + TraceContext.withContext(newContext("logging-with-mdc")) { TraceLocal.store(AvailableToMdc("some-cool-key"))(testString) withMdc { @@ -66,6 +71,6 @@ class ActorLoggingInstrumentationSpec extends TestKitBase with WordSpecLike with class LoggerActor extends Actor with ActorLogging { def receive = { - case "info" ⇒ log.info("TraceContext(name = {}, token = {})", TraceRecorder.currentContext.name, TraceRecorder.currentContext.token) + case "info" ⇒ log.info("TraceContext(name = {}, token = {})", TraceContext.currentContext.name, TraceContext.currentContext.token) } } diff --git a/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala index 0e9025af..8c1033ae 100644 --- a/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala @@ -2,49 +2,53 @@ package kamon.instrumentation.akka import akka.actor.SupervisorStrategy.{ Escalate, Restart, Resume, Stop } import akka.actor._ -import akka.testkit.{ TestKitBase, ImplicitSender } +import akka.testkit.ImplicitSender import com.typesafe.config.ConfigFactory -import kamon.trace.{ EmptyTraceContext, TraceRecorder } +import kamon.testkit.BaseKamonSpec +import kamon.trace.{ EmptyTraceContext, TraceContext } import org.scalatest.WordSpecLike import scala.concurrent.duration._ import scala.util.control.NonFatal -class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLike with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("actor-system-message-instrumentation-spec", ConfigFactory.parseString( - """ - |akka.loglevel = OFF - """.stripMargin)) +class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system-message-instrumentation-spec") with WordSpecLike with ImplicitSender { + override lazy val config = + ConfigFactory.parseString( + """ + |akka { + | loglevel = OFF + |} + """.stripMargin) - implicit val executionContext = system.dispatcher + implicit lazy val executionContext = system.dispatcher "the system message passing instrumentation" should { "keep the TraceContext while processing the Create message in top level actors" in { - val testTraceContext = TraceRecorder.withNewTraceContext("creating-top-level-actor") { + val testTraceContext = TraceContext.withContext(newContext("creating-top-level-actor")) { system.actorOf(Props(new Actor { - testActor ! TraceRecorder.currentContext + testActor ! TraceContext.currentContext def receive: Actor.Receive = { case any ⇒ } })) - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) } "keep the TraceContext while processing the Create message in non top level actors" in { - val testTraceContext = TraceRecorder.withNewTraceContext("creating-non-top-level-actor") { + val testTraceContext = TraceContext.withContext(newContext("creating-non-top-level-actor")) { system.actorOf(Props(new Actor { def receive: Actor.Receive = { case any ⇒ context.actorOf(Props(new Actor { - testActor ! TraceRecorder.currentContext + testActor ! TraceContext.currentContext def receive: Actor.Receive = { case any ⇒ } })) } })) ! "any" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) @@ -54,9 +58,9 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik "the actor is resumed" in { val supervisor = supervisorWithDirective(Resume) - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-resume") { + val testTraceContext = TraceContext.withContext(newContext("fail-and-resume")) { supervisor ! "fail" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -69,9 +73,9 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik "the actor is restarted" in { val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true) - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-restart") { + val testTraceContext = TraceContext.withContext(newContext("fail-and-restart")) { supervisor ! "fail" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -86,9 +90,9 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik "the actor is stopped" in { val supervisor = supervisorWithDirective(Stop, sendPostStop = true) - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-stop") { + val testTraceContext = TraceContext.withContext(newContext("fail-and-stop")) { supervisor ! "fail" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -99,9 +103,9 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik "the failure is escalated" in { val supervisor = supervisorWithDirective(Escalate, sendPostStop = true) - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-escalate") { + val testTraceContext = TraceContext.withContext(newContext("fail-and-escalate")) { supervisor ! "fail" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -119,7 +123,7 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik val child = context.actorOf(Props(new Parent)) override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; Stop + case NonFatal(throwable) ⇒ testActor ! TraceContext.currentContext; Stop } def receive = { @@ -131,7 +135,7 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik val child = context.actorOf(Props(new Child)) override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; directive + case NonFatal(throwable) ⇒ testActor ! TraceContext.currentContext; directive } def receive: Actor.Receive = { @@ -139,7 +143,7 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik } override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceRecorder.currentContext + if (sendPostStop) testActor ! TraceContext.currentContext super.postStop() } } @@ -147,26 +151,26 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik class Child extends Actor { def receive = { case "fail" ⇒ throw new ArithmeticException("Division by zero.") - case "context" ⇒ sender ! TraceRecorder.currentContext + case "context" ⇒ sender ! TraceContext.currentContext } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { - if (sendPreRestart) testActor ! TraceRecorder.currentContext + if (sendPreRestart) testActor ! TraceContext.currentContext super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { - if (sendPostRestart) testActor ! TraceRecorder.currentContext + if (sendPostRestart) testActor ! TraceContext.currentContext super.postRestart(reason) } override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceRecorder.currentContext + if (sendPostStop) testActor ! TraceContext.currentContext super.postStop() } override def preStart(): Unit = { - if (sendPreStart) testActor ! TraceRecorder.currentContext + if (sendPreStart) testActor ! TraceContext.currentContext super.preStart() } } diff --git a/kamon-akka/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala index 5c9905ba..0d63a19e 100644 --- a/kamon-akka/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala @@ -21,21 +21,26 @@ import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import akka.event.Logging.Warning import akka.pattern.ask -import akka.testkit.{ TestProbe, TestKitBase } +import akka.testkit.TestProbe import akka.util.Timeout import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.akka.Akka -import kamon.trace.{ TraceContext, TraceContextAware, TraceRecorder } -import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import kamon.testkit.BaseKamonSpec +import kamon.trace.{ TraceContext, TraceContextAware } import scala.concurrent.duration._ -class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll { - implicit lazy val system: ActorSystem = ActorSystem("ask-pattern-tracing-spec", - ConfigFactory.parseString("""akka.loggers = ["akka.event.slf4j.Slf4jLogger"]""")) +class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-spec") { + override lazy val config = + ConfigFactory.parseString( + """ + |akka { + | loglevel = OFF + |} + """.stripMargin) - implicit val ec = system.dispatcher + implicit lazy val ec = system.dispatcher implicit val askTimeout = Timeout(10 millis) // TODO: Make this work with ActorSelections @@ -46,9 +51,9 @@ class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with M setAskPatternTimeoutWarningMode("heavyweight") expectTimeoutWarning() { - TraceRecorder.withNewTraceContext("ask-timeout-warning") { + TraceContext.withContext(newContext("ask-timeout-warning")) { noReplyActorRef ? "hello" - TraceRecorder.currentContext + TraceContext.currentContext } } } @@ -59,9 +64,9 @@ class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with M setAskPatternTimeoutWarningMode("lightweight") expectTimeoutWarning(messageSizeLimit = Some(1)) { - TraceRecorder.withNewTraceContext("ask-timeout-warning") { + TraceContext.withContext(newContext("ask-timeout-warning")) { noReplyActorRef ? "hello" - TraceRecorder.currentContext + TraceContext.currentContext } } } @@ -72,9 +77,9 @@ class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with M setAskPatternTimeoutWarningMode("off") expectTimeoutWarning(expectWarning = false) { - TraceRecorder.withNewTraceContext("ask-timeout-warning") { + TraceContext.withContext(newContext("ask-timeout-warning")) { noReplyActorRef ? "hello" - TraceRecorder.currentContext + TraceContext.currentContext } } } diff --git a/kamon-akka/src/test/scala/kamon/metric/ActorMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/metric/ActorMetricsSpec.scala index 6d16386b..322abed2 100644 --- a/kamon-akka/src/test/scala/kamon/metric/ActorMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/metric/ActorMetricsSpec.scala @@ -20,44 +20,29 @@ import java.nio.LongBuffer import kamon.Kamon import kamon.akka.ActorMetrics import kamon.metric.ActorMetricsTestActor._ +import kamon.metric.instrument.CollectionContext import org.scalatest.{ BeforeAndAfterAll, WordSpecLike, Matchers } import akka.testkit.{ ImplicitSender, TestProbe, TestKitBase } import akka.actor._ import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ -import ActorMetrics.{ ActorMetricsRecorder, ActorMetricSnapshot } class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender with BeforeAndAfterAll { implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( """ - |kamon.metrics { + |kamon.metric { | tick-interval = 1 hour | default-collection-context-buffer-size = 10 | - | filters = [ - | { - | actor { - | includes = [ "user/tracked-*", "user/measuring-*", "user/clean-after-collect", "user/stop" ] - | excludes = [ "user/tracked-explicitly-excluded"] - | } - | } - | ] - | precision.actor { - | processing-time { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 - | } - | - | time-in-mailbox { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 + | filters { + | akka-actor { + | includes = [ "user/tracked-*", "user/measuring-*", "user/clean-after-collect", "user/stop" ] + | excludes = [ "user/tracked-explicitly-excluded", "user/non-tracked-actor" ] | } + | } | - | mailbox-size { - | refresh-interval = 1 hour - | highest-trackable-value = 999999999 - | significant-value-digits = 2 - | } + | instrument-settings { + | akka-actor.mailbox-size.refresh-interval = 1 hour | } |} | @@ -89,16 +74,16 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with expectMsg(Pong) val firstSnapshot = collectMetricsOf(trackedActor).get - firstSnapshot.errors.count should be(1L) - firstSnapshot.mailboxSize.numberOfMeasurements should be > 0L - firstSnapshot.processingTime.numberOfMeasurements should be(102L) // 102 examples - firstSnapshot.timeInMailbox.numberOfMeasurements should be(102L) // 102 examples + firstSnapshot.counter("errors").get.count should be(1L) + firstSnapshot.minMaxCounter("mailbox-size").get.numberOfMeasurements should be > 0L + firstSnapshot.histogram("processing-time").get.numberOfMeasurements should be(102L) // 102 examples + firstSnapshot.histogram("time-in-mailbox").get.numberOfMeasurements should be(102L) // 102 examples val secondSnapshot = collectMetricsOf(trackedActor).get // Ensure that the recorders are clean - secondSnapshot.errors.count should be(0L) - secondSnapshot.mailboxSize.numberOfMeasurements should be(3L) // min, max and current - secondSnapshot.processingTime.numberOfMeasurements should be(0L) - secondSnapshot.timeInMailbox.numberOfMeasurements should be(0L) + secondSnapshot.counter("errors").get.count should be(0L) + secondSnapshot.minMaxCounter("mailbox-size").get.numberOfMeasurements should be(3L) // min, max and current + secondSnapshot.histogram("processing-time").get.numberOfMeasurements should be(0L) + secondSnapshot.histogram("time-in-mailbox").get.numberOfMeasurements should be(0L) } } @@ -109,9 +94,9 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = expectMsgType[TrackedTimings] val snapshot = collectMetricsOf(trackedActor).get - snapshot.processingTime.numberOfMeasurements should be(1L) - snapshot.processingTime.recordsIterator.next().count should be(1L) - snapshot.processingTime.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + snapshot.histogram("processing-time").get.numberOfMeasurements should be(1L) + snapshot.histogram("processing-time").get.recordsIterator.next().count should be(1L) + snapshot.histogram("processing-time").get.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) } "record the number of errors" in new ActorMetricsFixtures { @@ -122,7 +107,7 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with expectMsg(Pong) val snapshot = collectMetricsOf(trackedActor).get - snapshot.errors.count should be(10) + snapshot.counter("errors").get.count should be(10) } "record the mailbox-size" in new ActorMetricsFixtures { @@ -138,8 +123,8 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with expectMsg(Pong) val snapshot = collectMetricsOf(trackedActor).get - snapshot.mailboxSize.min should be(0L) - snapshot.mailboxSize.max should be(11L +- 1L) + snapshot.minMaxCounter("mailbox-size").get.min should be(0L) + snapshot.minMaxCounter("mailbox-size").get.max should be(11L +- 1L) } "record the time-in-mailbox" in new ActorMetricsFixtures { @@ -149,20 +134,22 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = expectMsgType[TrackedTimings] val snapshot = collectMetricsOf(trackedActor).get - snapshot.timeInMailbox.numberOfMeasurements should be(1L) - snapshot.timeInMailbox.recordsIterator.next().count should be(1L) - snapshot.timeInMailbox.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + snapshot.histogram("time-in-mailbox").get.numberOfMeasurements should be(1L) + snapshot.histogram("time-in-mailbox").get.recordsIterator.next().count should be(1L) + snapshot.histogram("time-in-mailbox").get.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) } "clean up the associated recorder when the actor is stopped" in new ActorMetricsFixtures { val trackedActor = createTestActor("stop") + val firstRecorder = actorMetricsRecorderOf(trackedActor).get + // Killing the actor should remove it's ActorMetrics and registering again bellow should create a new one. val deathWatcher = TestProbe() deathWatcher.watch(trackedActor) trackedActor ! PoisonPill deathWatcher.expectTerminated(trackedActor) - actorMetricsRecorderOf(trackedActor) shouldBe empty + actorMetricsRecorderOf(trackedActor).get shouldNot be theSameInstanceAs (firstRecorder) } } @@ -175,10 +162,10 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") - def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetricsRecorder] = - Kamon(Metrics)(system).storage.get(ActorMetrics(actorRecorderName(ref))).map(_.asInstanceOf[ActorMetricsRecorder]) + def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetrics] = + Kamon(Metrics)(system).register(ActorMetrics, actorRecorderName(ref)).map(_.recorder) - def collectMetricsOf(ref: ActorRef): Option[ActorMetricSnapshot] = { + def collectMetricsOf(ref: ActorRef): Option[EntitySnapshot] = { Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested. actorMetricsRecorderOf(ref).map(_.collect(collectionContext)) } diff --git a/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala index 55af3f2e..2c530da9 100644 --- a/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala @@ -15,96 +15,199 @@ package kamon.metric -import akka.actor.{ ActorRef, ActorSystem, Props } +import java.nio.LongBuffer + +import akka.actor.{ PoisonPill, Props, ActorRef, ActorSystem } +import akka.dispatch.MessageDispatcher import akka.testkit.{ TestKitBase, TestProbe } import com.typesafe.config.ConfigFactory import kamon.Kamon -import kamon.akka.DispatcherMetrics -import DispatcherMetrics.DispatcherMetricSnapshot -import kamon.metric.Subscriptions.TickMetricSnapshot -import org.scalatest.{ Matchers, WordSpecLike } +import kamon.akka.{ ForkJoinPoolDispatcherMetrics, ThreadPoolExecutorDispatcherMetrics } +import kamon.metric.ActorMetricsTestActor.{ Pong, Ping } +import kamon.metric.instrument.CollectionContext +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers { +class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll { implicit lazy val system: ActorSystem = ActorSystem("dispatcher-metrics-spec", ConfigFactory.parseString( """ - |kamon.metrics { - | tick-interval = 1 second + |kamon.metric { + | tick-interval = 1 hour | default-collection-context-buffer-size = 10 | - | filters = [ - | { - | dispatcher { - | includes = ["*"] - | excludes = ["dispatcher-explicitly-excluded"] - | } + | filters = { + | akka-dispatcher { + | includes = [ "*" ] + | excludes = [ "explicitly-excluded" ] | } - | ] + | } + | + | default-instrument-settings { + | gauge.refresh-interval = 1 hour + | min-max-counter.refresh-interval = 1 hour + | } + |} + | + |explicitly-excluded { + | type = "Dispatcher" + | executor = "fork-join-executor" |} | - |dispatcher-explicitly-excluded { - | type = "Dispatcher" - | executor = "fork-join-executor" + |tracked-fjp { + | type = "Dispatcher" + | executor = "fork-join-executor" + | + | fork-join-executor { + | parallelism-min = 8 + | parallelism-factor = 100.0 + | parallelism-max = 22 + | } |} | - |tracked-dispatcher { - | type = "Dispatcher" - | executor = "thread-pool-executor" + |tracked-tpe { + | type = "Dispatcher" + | executor = "thread-pool-executor" + | + | thread-pool-executor { + | core-pool-size-min = 7 + | core-pool-size-factor = 100.0 + | max-pool-size-factor = 100.0 + | max-pool-size-max = 21 + | } |} | """.stripMargin)) "the Kamon dispatcher metrics" should { "respect the configured include and exclude filters" in { - system.actorOf(Props[ActorMetricsTestActor].withDispatcher("tracked-dispatcher"), "actor-with-tracked-dispatcher") - system.actorOf(Props[ActorMetricsTestActor].withDispatcher("dispatcher-explicitly-excluded"), "actor-with-excluded-dispatcher") + val defaultDispatcher = forceInit(system.dispatchers.lookup("akka.actor.default-dispatcher")) + val fjpDispatcher = forceInit(system.dispatchers.lookup("tracked-fjp")) + val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe")) + val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded")) + + findDispatcherRecorder(defaultDispatcher) shouldNot be(empty) + findDispatcherRecorder(fjpDispatcher) shouldNot be(empty) + findDispatcherRecorder(tpeDispatcher) shouldNot be(empty) + findDispatcherRecorder(excludedDispatcher) should be(empty) + } - Kamon(Metrics).subscribe(DispatcherMetrics, "*", testActor, permanently = true) - expectMsgType[TickMetricSnapshot] + "record metrics for a dispatcher with thread-pool-executor" in { + implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") + collectDispatcherMetrics(tpeDispatcher) - within(2 seconds) { - val tickSnapshot = expectMsgType[TickMetricSnapshot] - tickSnapshot.metrics.keys should contain(DispatcherMetrics("tracked-dispatcher")) - tickSnapshot.metrics.keys should not contain (DispatcherMetrics("dispatcher-explicitly-excluded")) - } + Await.result({ + Future.sequence { + for (_ ← 1 to 100) yield submit(tpeDispatcher) + } + }, 5 seconds) + + refreshDispatcherInstruments(tpeDispatcher) + val snapshot = collectDispatcherMetrics(tpeDispatcher) + + snapshot.gauge("active-threads") should not be empty + snapshot.gauge("pool-size").get.min should be >= 7L + snapshot.gauge("pool-size").get.max should be <= 21L + snapshot.gauge("max-pool-size").get.max should be(21) + snapshot.gauge("core-pool-size").get.max should be(21) + snapshot.gauge("processed-tasks").get.max should be(102L +- 5L) + + // The processed tasks should be reset to 0 if no more tasks are submitted. + val secondSnapshot = collectDispatcherMetrics(tpeDispatcher) + secondSnapshot.gauge("processed-tasks").get.max should be(0) } - "record maximumPoolSize, runningThreadCount, queueTaskCount, poolSize metrics" in new DelayableActorFixture { - val (delayable, metricsListener) = delayableActor("worker-actor", "tracked-dispatcher") + "record metrics for a dispatcher with fork-join-executor" in { + implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") + collectDispatcherMetrics(fjpDispatcher) + + Await.result({ + Future.sequence { + for (_ ← 1 to 100) yield submit(fjpDispatcher) + } + }, 5 seconds) - for (_ ← 1 to 100) { - //delayable ! Discard - } + refreshDispatcherInstruments(fjpDispatcher) + val snapshot = collectDispatcherMetrics(fjpDispatcher) + + snapshot.minMaxCounter("parallelism").get.max should be(22) + snapshot.gauge("pool-size").get.min should be >= 0L + snapshot.gauge("pool-size").get.max should be <= 22L + snapshot.gauge("active-threads").get.max should be >= 0L + snapshot.gauge("running-threads").get.max should be >= 0L + snapshot.gauge("queued-task-count").get.max should be(0) - val dispatcherMetrics = expectDispatcherMetrics("tracked-dispatcher", metricsListener, 3 seconds) - dispatcherMetrics.maximumPoolSize.max should be <= 64L //fail in travis - dispatcherMetrics.poolSize.max should be <= 22L //fail in travis - dispatcherMetrics.queueTaskCount.max should be(0L) - dispatcherMetrics.runningThreadCount.max should be(0L) } - } + "clean up the metrics recorders after a dispatcher is shut down" in { + implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") + implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") + + findDispatcherRecorder(fjpDispatcher) shouldNot be(empty) + findDispatcherRecorder(tpeDispatcher) shouldNot be(empty) - def expectDispatcherMetrics(dispatcherId: String, listener: TestProbe, waitTime: FiniteDuration): DispatcherMetricSnapshot = { - val tickSnapshot = within(waitTime) { - listener.expectMsgType[TickMetricSnapshot] + shutdownDispatcher(tpeDispatcher) + shutdownDispatcher(fjpDispatcher) + + findDispatcherRecorder(fjpDispatcher) should be(empty) + findDispatcherRecorder(tpeDispatcher) should be(empty) } - val dispatcherMetricsOption = tickSnapshot.metrics.get(DispatcherMetrics(dispatcherId)) - dispatcherMetricsOption should not be empty - dispatcherMetricsOption.get.asInstanceOf[DispatcherMetricSnapshot] + + } + + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) } - trait DelayableActorFixture { - def delayableActor(name: String, dispatcher: String): (ActorRef, TestProbe) = { - val actor = system.actorOf(Props[ActorMetricsTestActor].withDispatcher(dispatcher), name) - val metricsListener = TestProbe() + def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") - Kamon(Metrics).subscribe(DispatcherMetrics, "*", metricsListener.ref, permanently = true) - // Wait for one empty snapshot before proceeding to the test. - metricsListener.expectMsgType[TickMetricSnapshot] + def findDispatcherRecorder(dispatcher: MessageDispatcher): Option[EntityRecorder] = + Kamon(Metrics)(system).find(dispatcher.id, "akka-dispatcher") - (actor, metricsListener) + def collectDispatcherMetrics(dispatcher: MessageDispatcher): EntitySnapshot = + findDispatcherRecorder(dispatcher).map(_.collect(collectionContext)).get + + def refreshDispatcherInstruments(dispatcher: MessageDispatcher): Unit = { + findDispatcherRecorder(dispatcher) match { + case Some(tpe: ThreadPoolExecutorDispatcherMetrics) ⇒ + tpe.processedTasks.refreshValue() + tpe.activeThreads.refreshValue() + tpe.maxPoolSize.refreshValue() + tpe.poolSize.refreshValue() + tpe.corePoolSize.refreshValue() + + case Some(fjp: ForkJoinPoolDispatcherMetrics) ⇒ + fjp.activeThreads.refreshValue() + fjp.poolSize.refreshValue() + fjp.queuedTaskCount.refreshValue() + fjp.paralellism.refreshValues() + fjp.runningThreads.refreshValue() + + case other ⇒ } } + + def forceInit(dispatcher: MessageDispatcher): MessageDispatcher = { + val listener = TestProbe() + Future { + listener.ref ! "init done" + }(dispatcher) + listener.expectMsg("init done") + + dispatcher + } + + def submit(dispatcher: MessageDispatcher): Future[String] = Future { + "hello" + }(dispatcher) + + def shutdownDispatcher(dispatcher: MessageDispatcher): Unit = { + val shutdownMethod = dispatcher.getClass.getDeclaredMethod("shutdown") + shutdownMethod.setAccessible(true) + shutdownMethod.invoke(dispatcher) + } + + override protected def afterAll(): Unit = system.shutdown() } + diff --git a/kamon-akka/src/test/scala/kamon/metric/RouterMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/metric/RouterMetricsSpec.scala index abc195ba..5f6bbb4f 100644 --- a/kamon-akka/src/test/scala/kamon/metric/RouterMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/metric/RouterMetricsSpec.scala @@ -18,17 +18,13 @@ package kamon.metric import java.nio.LongBuffer import akka.actor._ -import akka.kamon.instrumentation.ActorCellMetrics import akka.routing._ import akka.testkit.{ ImplicitSender, TestKitBase, TestProbe } import com.typesafe.config.ConfigFactory import kamon.Kamon -import kamon.akka.{ RouterMetrics, ActorMetrics } -import ActorMetrics.{ ActorMetricSnapshot, ActorMetricsRecorder } -import RouterMetrics._ +import kamon.akka.RouterMetrics import kamon.metric.RouterMetricsTestActor._ -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.instrument.{ Counter, Histogram } +import kamon.metric.instrument.CollectionContext import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } import scala.concurrent.duration._ @@ -36,22 +32,14 @@ import scala.concurrent.duration._ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender with BeforeAndAfterAll { implicit lazy val system: ActorSystem = ActorSystem("router-metrics-spec", ConfigFactory.parseString( """ - |kamon.metrics { + |kamon.metric { | tick-interval = 1 hour | default-collection-context-buffer-size = 10 | - | filters = [ - | { - | router { - | includes = [ "user/tracked-*", "user/measuring-*", "user/stop-*" ] - | excludes = [ "user/tracked-explicitly-excluded-*"] - | } - | } - | ] - | precision { - | default-histogram-precision { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 + | filters = { + | akka-router { + | includes = [ "user/tracked-*", "user/measuring-*", "user/stop-*" ] + | excludes = [ "user/tracked-explicitly-excluded-*"] | } | } |} @@ -85,7 +73,7 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with listener.expectMsg(Pong) val routerSnapshot = collectMetricsOf("user/measuring-routing-time-in-pool-router").get - routerSnapshot.routingTime.numberOfMeasurements should be(1L) + routerSnapshot.histogram("routing-time").get.numberOfMeasurements should be(1L) } "record the routing-time of the receive function for group routers" in new RouterMetricsFixtures { @@ -96,7 +84,7 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with listener.expectMsg(Pong) val routerSnapshot = collectMetricsOf("user/measuring-routing-time-in-group-router").get - routerSnapshot.routingTime.numberOfMeasurements should be(1L) + routerSnapshot.histogram("routing-time").get.numberOfMeasurements should be(1L) } "record the processing-time of the receive function for pool routers" in new RouterMetricsFixtures { @@ -107,9 +95,9 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = timingsListener.expectMsgType[RouterTrackedTimings] val routerSnapshot = collectMetricsOf("user/measuring-processing-time-in-pool-router").get - routerSnapshot.processingTime.numberOfMeasurements should be(1L) - routerSnapshot.processingTime.recordsIterator.next().count should be(1L) - routerSnapshot.processingTime.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + routerSnapshot.histogram("processing-time").get.numberOfMeasurements should be(1L) + routerSnapshot.histogram("processing-time").get.recordsIterator.next().count should be(1L) + routerSnapshot.histogram("processing-time").get.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) } "record the processing-time of the receive function for group routers" in new RouterMetricsFixtures { @@ -120,9 +108,9 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = timingsListener.expectMsgType[RouterTrackedTimings] val routerSnapshot = collectMetricsOf("user/measuring-processing-time-in-group-router").get - routerSnapshot.processingTime.numberOfMeasurements should be(1L) - routerSnapshot.processingTime.recordsIterator.next().count should be(1L) - routerSnapshot.processingTime.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + routerSnapshot.histogram("processing-time").get.numberOfMeasurements should be(1L) + routerSnapshot.histogram("processing-time").get.recordsIterator.next().count should be(1L) + routerSnapshot.histogram("processing-time").get.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) } "record the number of errors for pool routers" in new RouterMetricsFixtures { @@ -137,7 +125,7 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with listener.expectMsg(Pong) val routerSnapshot = collectMetricsOf("user/measuring-errors-in-pool-router").get - routerSnapshot.errors.count should be(10L) + routerSnapshot.counter("errors").get.count should be(10L) } "record the number of errors for group routers" in new RouterMetricsFixtures { @@ -152,7 +140,7 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with listener.expectMsg(Pong) val routerSnapshot = collectMetricsOf("user/measuring-errors-in-group-router").get - routerSnapshot.errors.count should be(10L) + routerSnapshot.counter("errors").get.count should be(10L) } "record the time-in-mailbox for pool routers" in new RouterMetricsFixtures { @@ -163,9 +151,9 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = timingsListener.expectMsgType[RouterTrackedTimings] val routerSnapshot = collectMetricsOf("user/measuring-time-in-mailbox-in-pool-router").get - routerSnapshot.timeInMailbox.numberOfMeasurements should be(1L) - routerSnapshot.timeInMailbox.recordsIterator.next().count should be(1L) - routerSnapshot.timeInMailbox.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + routerSnapshot.histogram("time-in-mailbox").get.numberOfMeasurements should be(1L) + routerSnapshot.histogram("time-in-mailbox").get.recordsIterator.next().count should be(1L) + routerSnapshot.histogram("time-in-mailbox").get.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) } "record the time-in-mailbox for group routers" in new RouterMetricsFixtures { @@ -176,33 +164,35 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = timingsListener.expectMsgType[RouterTrackedTimings] val routerSnapshot = collectMetricsOf("user/measuring-time-in-mailbox-in-group-router").get - routerSnapshot.timeInMailbox.numberOfMeasurements should be(1L) - routerSnapshot.timeInMailbox.recordsIterator.next().count should be(1L) - routerSnapshot.timeInMailbox.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + routerSnapshot.histogram("time-in-mailbox").get.numberOfMeasurements should be(1L) + routerSnapshot.histogram("time-in-mailbox").get.recordsIterator.next().count should be(1L) + routerSnapshot.histogram("time-in-mailbox").get.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) } "clean up the associated recorder when the pool router is stopped" in new RouterMetricsFixtures { val trackedRouter = createTestPoolRouter("stop-in-pool-router") - routerMetricsRecorderOf("user/stop-in-pool-router") should not be empty + val firstRecorder = routerMetricsRecorderOf("user/stop-in-pool-router").get + // Killing the router should remove it's RouterMetrics and registering again bellow should create a new one. val deathWatcher = TestProbe() deathWatcher.watch(trackedRouter) trackedRouter ! PoisonPill deathWatcher.expectTerminated(trackedRouter) - routerMetricsRecorderOf("user/stop-in-pool-router") shouldBe empty + routerMetricsRecorderOf("user/stop-in-pool-router").get shouldNot be theSameInstanceAs (firstRecorder) } "clean up the associated recorder when the group router is stopped" in new RouterMetricsFixtures { val trackedRouter = createTestPoolRouter("stop-in-group-router") - routerMetricsRecorderOf("user/stop-in-group-router") should not be empty + val firstRecorder = routerMetricsRecorderOf("user/stop-in-group-router").get + // Killing the router should remove it's RouterMetrics and registering again bellow should create a new one. val deathWatcher = TestProbe() deathWatcher.watch(trackedRouter) trackedRouter ! PoisonPill deathWatcher.expectTerminated(trackedRouter) - routerMetricsRecorderOf("user/stop-in-group-router") shouldBe empty + routerMetricsRecorderOf("user/stop-in-group-router").get shouldNot be theSameInstanceAs (firstRecorder) } } @@ -213,10 +203,10 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val buffer: LongBuffer = LongBuffer.allocate(10000) } - def routerMetricsRecorderOf(routerName: String): Option[RouterMetricsRecorder] = - Kamon(Metrics)(system).storage.get(RouterMetrics(routerName)).map(_.asInstanceOf[RouterMetricsRecorder]) + def routerMetricsRecorderOf(routerName: String): Option[RouterMetrics] = + Kamon(Metrics)(system).register(RouterMetrics, routerName).map(_.recorder) - def collectMetricsOf(routerName: String): Option[RouterMetricSnapshot] = { + def collectMetricsOf(routerName: String): Option[EntitySnapshot] = { Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested. routerMetricsRecorderOf(routerName).map(_.collect(collectionContext)) } @@ -255,16 +245,6 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with router } } - - trait ActorMetricsFixtures { - val collectionContext = new CollectionContext { - val buffer: LongBuffer = LongBuffer.allocate(10000) - } - - def createTestActor(name: String): ActorRef = system.actorOf(Props[ActorMetricsTestActor], name) - - def takeSnapshotOf(amr: ActorMetricsRecorder): ActorMetricSnapshot = amr.collect(collectionContext) - } } class RouterMetricsTestActor extends Actor { |