From 29068fc70a3e5a17a630c2c7fff951572bb5fa21 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 3 Jul 2014 14:36:42 -0300 Subject: ! all: refactor the core metric recording instruments and accomodate UserMetrics This PR is including several changes to the kamon-core, most notably: - Formalize the interface for Histograms, Counters and MinMaxCounters. Making sure that the interfaces are as clean as possible. - Move away from the all Vector[Measurement] based Histogram snapshot to a new approach in which we use a single long to store both the index in the counts array and the frequency on that bucket. The leftmost 2 bytes of each long are used for storing the counts array index and the remaining 6 bytes are used for the actual count, and everything is put into a simple long array. This way only the buckets that actually have values will be included in the snapshot with the smallest possible memory footprint. - Introduce Gauges. - Reorganize the instrumentation for Akka and Scala and rewrite most of the tests of this components to avoid going through the subscription protocol to test. - Introduce trace tests and fixes on various tests. - Necessary changes on new relic, datadog and statsd modules to compile with the new codebase. Pending: - Finish the upgrade of the new relic to the current model. - Introduce proper limit checks for histograms to ensure that we never pass the 2/6 bytes limits. - More testing, more testing, more testing. - Create the KamonStandalone module. --- kamon-core/src/main/java/kamon/util/Example.java | 8 + kamon-core/src/main/resources/META-INF/aop.xml | 22 +- kamon-core/src/main/resources/reference.conf | 43 ++-- .../ActorMessagePassingTracing.scala | 146 ----------- .../ActorSystemMessagePassingTracing.scala | 65 ----- .../akka/instrumentation/AskPatternTracing.scala | 54 ---- .../akka/instrumentation/DispatcherTracing.scala | 161 ------------ .../instrumentation/ActorLoggingTracing.scala | 47 ---- .../kamon/instrumentation/FutureTracing.scala | 47 ---- .../akka/ActorCellInstrumentation.scala | 130 ++++++++++ .../akka/ActorLoggingInstrumentation.scala | 47 ++++ .../akka/ActorSystemMessageInstrumentation.scala | 80 ++++++ .../akka/AskPatternInstrumentation.scala | 55 ++++ .../akka/DispatcherInstrumentation.scala | 163 ++++++++++++ .../AtomicHistogramFieldsAccessor.scala | 35 +++ .../scala/FutureInstrumentation.scala | 48 ++++ .../src/main/scala/kamon/metric/ActorMetrics.scala | 89 +++++++ .../scala/kamon/metric/DispatcherMetrics.scala | 88 +++++++ .../main/scala/kamon/metric/EntityMetrics.scala | 75 ++++++ .../main/scala/kamon/metric/MetricsExtension.scala | 110 ++++++++ kamon-core/src/main/scala/kamon/metric/Scale.scala | 31 +++ .../main/scala/kamon/metric/Subscriptions.scala | 128 ++++++++++ .../src/main/scala/kamon/metric/TraceMetrics.scala | 77 ++++++ .../src/main/scala/kamon/metric/UserMetrics.scala | 139 +++++++++++ .../scala/kamon/metric/instrument/Counter.scala | 59 +++++ .../main/scala/kamon/metric/instrument/Gauge.scala | 78 ++++++ .../scala/kamon/metric/instrument/Histogram.scala | 246 ++++++++++++++++++ .../kamon/metric/instrument/MinMaxCounter.scala | 116 +++++++++ .../src/main/scala/kamon/metric/package.scala | 34 +++ .../main/scala/kamon/metrics/ActorMetrics.scala | 70 ------ .../main/scala/kamon/metrics/CustomMetric.scala | 52 ---- .../scala/kamon/metrics/DispatcherMetrics.scala | 71 ------ .../src/main/scala/kamon/metrics/Metrics.scala | 121 --------- .../scala/kamon/metrics/MetricsExtension.scala | 104 -------- .../src/main/scala/kamon/metrics/Scale.scala | 31 --- .../main/scala/kamon/metrics/Subscriptions.scala | 129 ---------- .../main/scala/kamon/metrics/TraceMetrics.scala | 66 ----- .../instruments/ContinuousHdrRecorder.scala | 52 ---- .../metrics/instruments/CounterRecorder.scala | 38 --- .../kamon/metrics/instruments/HdrRecorder.scala | 78 ------ .../kamon/metrics/instruments/MinMaxCounter.scala | 58 ----- .../src/main/scala/kamon/metrics/package.scala | 39 --- .../scala/kamon/standalone/KamonStandalone.scala | 11 + .../src/main/scala/kamon/trace/TraceContext.scala | 8 +- .../src/main/scala/kamon/trace/TraceRecorder.scala | 4 +- .../akka/ActorCellInstrumentationSpec.scala | 87 +++++++ .../akka/ActorLoggingInstrumentationSpec.scala | 52 ++++ .../ActorSystemMessageInstrumentationSpec.scala | 172 +++++++++++++ .../akka/AskPatternInstrumentationSpec.scala | 67 +++++ .../scala/FutureInstrumentationSpec.scala | 63 +++++ .../test/scala/kamon/metric/ActorMetricsSpec.scala | 202 +++++++++++++++ .../scala/kamon/metric/DispatcherMetricsSpec.scala | 105 ++++++++ .../metric/TickMetricSnapshotBufferSpec.scala | 109 ++++++++ .../test/scala/kamon/metric/TraceMetricsSpec.scala | 92 +++++++ .../test/scala/kamon/metric/UserMetricsSpec.scala | 278 +++++++++++++++++++++ .../kamon/metric/instrument/CounterSpec.scala | 55 ++++ .../scala/kamon/metric/instrument/GaugeSpec.scala | 70 ++++++ .../kamon/metric/instrument/HistogramSpec.scala | 130 ++++++++++ .../metric/instrument/MinMaxCounterSpec.scala | 108 ++++++++ .../scala/kamon/metrics/ActorMetricsSpec.scala | 172 ------------- .../scala/kamon/metrics/CustomMetricSpec.scala | 78 ------ .../kamon/metrics/DispatcherMetricsSpec.scala | 105 -------- .../scala/kamon/metrics/MetricSnapshotSpec.scala | 72 ------ .../metrics/TickMetricSnapshotBufferSpec.scala | 81 ------ .../metrics/instrument/MinMaxCounterSpec.scala | 110 -------- .../kamon/trace/TraceContextManipulationSpec.scala | 95 +++++++ .../trace/instrumentation/ActorLoggingSpec.scala | 51 ---- .../ActorMessagePassingTracingSpec.scala | 85 ------- ...orSystemMessagePassingInstrumentationSpec.scala | 169 ------------- .../instrumentation/AskPatternTracingSpec.scala | 66 ----- .../trace/instrumentation/FutureTracingSpec.scala | 62 ----- .../src/main/scala/kamon/datadog/Datadog.scala | 2 +- .../scala/kamon/datadog/DatadogMetricsSender.scala | 47 ++-- .../kamon/datadog/DatadogMetricSenderSpec.scala | 54 ++-- .../main/scala/kamon/newrelic/CustomMetrics.scala | 6 +- .../scala/kamon/newrelic/MetricTranslator.scala | 2 +- .../src/main/scala/kamon/newrelic/NewRelic.scala | 8 +- .../kamon/newrelic/WebTransactionMetrics.scala | 9 +- .../src/main/scala/kamon/newrelic/package.scala | 32 +-- .../play/instrumentation/WSInstrumentation.scala | 5 +- .../scala/kamon/play/WSInstrumentationSpec.scala | 6 +- .../main/scala/test/SimpleRequestProcessor.scala | 7 +- .../can/client/ClientRequestInstrumentation.scala | 12 +- .../spray/ClientRequestInstrumentationSpec.scala | 15 +- .../spray/ServerRequestInstrumentationSpec.scala | 9 +- .../src/test/scala/kamon/spray/TestServer.scala | 11 +- .../src/main/scala/kamon/statsd/StatsD.scala | 2 +- .../scala/kamon/statsd/StatsDMetricsSender.scala | 35 +-- .../kamon/statsd/StatsDMetricSenderSpec.scala | 40 +-- project/Settings.scala | 2 +- 90 files changed, 3725 insertions(+), 2668 deletions(-) create mode 100644 kamon-core/src/main/java/kamon/util/Example.java delete mode 100644 kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala delete mode 100644 kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala delete mode 100644 kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala delete mode 100644 kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/Scale.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/Subscriptions.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/UserMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/package.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/Metrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/Scale.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/package.scala create mode 100644 kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala create mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/metrics/instrument/MinMaxCounterSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala diff --git a/kamon-core/src/main/java/kamon/util/Example.java b/kamon-core/src/main/java/kamon/util/Example.java new file mode 100644 index 00000000..a5031182 --- /dev/null +++ b/kamon-core/src/main/java/kamon/util/Example.java @@ -0,0 +1,8 @@ +package kamon.util; + +public class Example { + + public static void main(String args[]) { + + } +} diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 3f7dd42d..e1edaed9 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -3,23 +3,23 @@ - - - - - - - + + + + + + + - - + + - + - + diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index d2830892..b7f5c70e 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -48,30 +48,33 @@ kamon { ] precision { + default-histogram-precision { + highest-trackable-value = 3600000000000 + significant-value-digits = 2 + } + + default-min-max-counter-precision { + refresh-interval = 100 milliseconds + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + + default-gauge-precision { + refresh-interval = 100 milliseconds + highest-trackable-value = 999999999 + significant-value-digits = 2 + } + + actor { - processing-time { - highest-trackable-value = 3600000000000 - significant-value-digits = 2 - } - time-in-mailbox { - highest-trackable-value = 3600000000000 - significant-value-digits = 2 - } - mailbox-size { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } + processing-time = ${kamon.metrics.precision.default-histogram-precision} + time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} + mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision} } trace { - elapsed-time { - highest-trackable-value = 3600000000000 - significant-value-digits = 2 - } - segment { - highest-trackable-value = 3600000000000 - significant-value-digits = 2 - } + elapsed-time = ${kamon.metrics.precision.default-histogram-precision} + segment = ${kamon.metrics.precision.default-histogram-precision} } dispatcher { diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala deleted file mode 100644 index 6db86828..00000000 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ /dev/null @@ -1,146 +0,0 @@ -/* =================================================== - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ - -package akka.instrumentation - -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint -import akka.actor._ -import akka.dispatch.{ Envelope, MessageDispatcher } -import kamon.trace._ -import kamon.metrics.{ ActorMetrics, Metrics } -import kamon.Kamon -import kamon.metrics.ActorMetrics.ActorMetricRecorder -import kamon.metrics.instruments.MinMaxCounter -import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement - -@Aspect -class BehaviourInvokeTracing { - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") - def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - - @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") - def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - - val metricsExtension = Kamon(Metrics)(system) - val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.metricIdentity = metricIdentity - cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) - - if (cellWithMetrics.actorMetricsRecorder.isDefined) { - cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder { - cellWithMetrics.actorMetricsRecorder.map { am ⇒ - import am.mailboxSize._ - val CounterMeasurement(min, max, current) = cellWithMetrics.queueSize.collect() - - record(min) - record(max) - record(current) - } - } - } - } - - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") - def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} - - @Around("invokingActorBehaviourAtActorCell(cell, envelope)") - def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { - val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - try { - TraceRecorder.withTraceContext(contextAndTimestamp.traceContext) { - pjp.proceed() - } - } finally { - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ - am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) - am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) - cellWithMetrics.queueSize.decrement() - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)") - def sendingMessageToActorCell(cell: ActorCell): Unit = {} - - @After("sendingMessageToActorCell(cell)") - def afterSendMessageToActorCell(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map(am ⇒ cellWithMetrics.queueSize.increment()) - } - - @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") - def actorStop(cell: ActorCell): Unit = {} - - @After("actorStop(cell)") - def afterStop(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricsRecorder.map { p ⇒ - cellWithMetrics.mailboxSizeCollectorCancellable.cancel() - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity) - } - } - - @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") - def actorInvokeFailure(cell: ActorCell): Unit = {} - - @Before("actorInvokeFailure(cell)") - def beforeInvokeFailure(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ am.errorCounter.record(1L) - } - } -} - -trait ActorCellMetrics { - var metricIdentity: ActorMetrics = _ - var actorMetricsRecorder: Option[ActorMetricRecorder] = _ - var mailboxSizeCollectorCancellable: Cancellable = _ - val queueSize = MinMaxCounter() -} - -@Aspect -class ActorCellMetricsMixin { - - @DeclareMixin("akka.actor.ActorCell") - def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} -} - -@Aspect -class EnvelopeTraceContextMixin { - - @DeclareMixin("akka.dispatch.Envelope") - def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala deleted file mode 100644 index 7d03d946..00000000 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala +++ /dev/null @@ -1,65 +0,0 @@ -package akka.instrumentation - -import org.aspectj.lang.annotation._ -import akka.dispatch.sysmsg.EarliestFirstSystemMessageList -import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.{ TraceRecorder, TraceContextAware } - -@Aspect -class SystemMessageTraceContextMixin { - - @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") - def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } -} - -@Aspect -class RepointableActorRefTraceContextMixin { - - @DeclareMixin("akka.actor.RepointableActorRef") - def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } - - @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") - def repointableActorRefCreation(repointableActorRef: TraceContextAware): Unit = {} - - @Around("repointableActorRefCreation(repointableActorRef)") - def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { - TraceRecorder.withTraceContext(repointableActorRef.traceContext) { - pjp.proceed() - } - } - -} - -@Aspect -class ActorSystemMessagePassingTracing { - - @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") - def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} - - @Around("systemMessageProcessing(messages)") - def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { - if (messages.nonEmpty) { - val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext - TraceRecorder.withTraceContext(ctx)(pjp.proceed()) - - } else pjp.proceed() - } -} diff --git a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala deleted file mode 100644 index 31ec92a4..00000000 --- a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.instrumentation - -import org.aspectj.lang.annotation.{ AfterReturning, Pointcut, Aspect } -import akka.event.Logging.Warning -import scala.compat.Platform.EOL -import akka.actor.ActorRefProvider -import akka.pattern.{ AskTimeoutException, PromiseActorRef } -import kamon.trace.Trace -import kamon.Kamon - -@Aspect -class AskPatternTracing { - - class StackTraceCaptureException extends Throwable - - @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *, *)", argNames = "provider") - def promiseActorRefApply(provider: ActorRefProvider): Unit = {} - - @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor") - def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = { - val system = promiseActor.provider.guardian.underlying.system - val traceExtension = Kamon(Trace)(system) - - if (traceExtension.enableAskPatternTracing) { - val future = promiseActor.result.future - implicit val ec = system.dispatcher - val stack = new StackTraceCaptureException - - future onFailure { - case timeout: AskTimeoutException ⇒ - val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL) - - system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternTracing], - "Timeout triggered for ask pattern registered at: " + stackString)) - } - } - } -} diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala deleted file mode 100644 index 60cc4ddf..00000000 --- a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.instrumentation - -import org.aspectj.lang.annotation._ -import akka.dispatch.{ Dispatchers, ExecutorServiceDelegate, Dispatcher, MessageDispatcher } -import kamon.metrics.{ Metrics, DispatcherMetrics } -import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder -import kamon.Kamon -import akka.actor.{ Cancellable, ActorSystemImpl } -import scala.concurrent.forkjoin.ForkJoinPool -import java.util.concurrent.ThreadPoolExecutor -import java.lang.reflect.Method -import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement - -@Aspect -class DispatcherTracing { - - @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") - def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} - - @Before("onActorSystemStartup(dispatchers, system)") - def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = { - val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem] - currentDispatchers.actorSystem = system - } - - @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)") - def onDispatchersLookup(dispatchers: Dispatchers) = {} - - @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") - def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { - val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] - - dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem - } - - @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))") - def onCreateExecutorService(): Unit = {} - - @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))") - def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {} - - @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)") - def onDispatcherStartup(dispatcher: Dispatcher): Unit = {} - - @After("onDispatcherStartup(dispatcher)") - def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { - - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] - val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) - val metricIdentity = DispatcherMetrics(dispatcher.id) - - dispatcherWithMetrics.metricIdentity = metricIdentity - dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory) - - if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) { - dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder { - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dm ⇒ - val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = - DispatcherMetricsCollector.collect(dispatcher) - - dm.maximumPoolSize.record(maximumPoolSize) - dm.runningThreadCount.record(runningThreadCount) - dm.queueTaskCount.record(queueTaskCount) - dm.poolSize.record(poolSize) - } - } - } - } - - @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)") - def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {} - - @After("onDispatcherShutdown(dispatcher)") - def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] - - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dispatcher ⇒ - dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() - Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity) - } - } -} - -@Aspect -class DispatcherMetricsMixin { - - @DeclareMixin("akka.dispatch.Dispatcher") - def mixinDispatcherMetricsToMessageDispatcher: DispatcherMessageMetrics = new DispatcherMessageMetrics {} - - @DeclareMixin("akka.dispatch.Dispatchers") - def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} -} - -trait DispatcherMessageMetrics { - var metricIdentity: DispatcherMetrics = _ - var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ - var dispatcherCollectorCancellable: Cancellable = _ - var actorSystem: ActorSystemImpl = _ -} - -trait DispatchersWithActorSystem { - var actorSystem: ActorSystemImpl = _ -} - -object DispatcherMetricsCollector { - - case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long) - - private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, - (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) - } - - private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) - } - - private val executorServiceMethod: Method = { - // executorService is protected - val method = classOf[Dispatcher].getDeclaredMethod("executorService") - method.setAccessible(true) - method - } - - def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = { - dispatcher match { - case x: Dispatcher ⇒ { - val executor = executorServiceMethod.invoke(x) match { - case delegate: ExecutorServiceDelegate ⇒ delegate.executor - case other ⇒ other - } - - executor match { - case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) - case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) - case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } - case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala deleted file mode 100644 index 85d39d3e..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.instrumentation - -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.{ TraceContextAware, TraceRecorder } - -@Aspect -class ActorLoggingTracing { - - @DeclareMixin("akka.event.Logging.LogEvent+") - def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.event.Logging.LogEvent+.new(..)) && this(event)") - def logEventCreation(event: TraceContextAware): Unit = {} - - @After("logEventCreation(event)") - def captureTraceContext(event: TraceContextAware): Unit = { - // Force initialization of TraceContextAware - event.traceContext - } - - @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") - def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {} - - @Around("withMdcInvocation(logSource, logEvent, logStatement)") - def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { - TraceRecorder.withTraceContext(logEvent.traceContext) { - pjp.proceed() - } - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala deleted file mode 100644 index 634c94a1..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation - -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.{ TraceContextAware, TraceRecorder } - -@Aspect -class FutureTracing { - - @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") - def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default - - @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)") - def futureRelatedRunnableCreation(runnable: TraceContextAware): Unit = {} - - @After("futureRelatedRunnableCreation(runnable)") - def afterCreation(runnable: TraceContextAware): Unit = { - // Force traceContext initialization. - runnable.traceContext - } - - @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)") - def futureRelatedRunnableExecution(runnable: TraceContextAware) = {} - - @Around("futureRelatedRunnableExecution(runnable)") - def aroundExecution(pjp: ProceedingJoinPoint, runnable: TraceContextAware): Any = { - TraceRecorder.withTraceContext(runnable.traceContext) { - pjp.proceed() - } - } - -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala new file mode 100644 index 00000000..5fce4555 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala @@ -0,0 +1,130 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.instrumentation + +import akka.actor._ +import akka.dispatch.{ Envelope, MessageDispatcher } +import kamon.Kamon +import kamon.metric.ActorMetrics.ActorMetricsRecorder +import kamon.metric.{ ActorMetrics, Metrics } +import kamon.trace._ +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorCellInstrumentation { + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") + def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + + val metricsExtension = Kamon(Metrics)(system) + val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + + cellWithMetrics.metricIdentity = metricIdentity + cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) + } + + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") + def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} + + @Around("invokingActorBehaviourAtActorCell(cell, envelope)") + def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { + val timestampBeforeProcessing = System.nanoTime() + val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + + try { + TraceRecorder.withTraceContext(contextAndTimestamp.traceContext) { + pjp.proceed() + } + } finally { + cellWithMetrics.actorMetricsRecorder.map { + am ⇒ + am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) + am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) + am.mailboxSize.decrement() + } + } + } + + @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)") + def sendingMessageToActorCell(cell: ActorCell): Unit = {} + + @After("sendingMessageToActorCell(cell)") + def afterSendMessageToActorCell(cell: ActorCell): Unit = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment()) + } + + @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") + def actorStop(cell: ActorCell): Unit = {} + + @After("actorStop(cell)") + def afterStop(cell: ActorCell): Unit = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + + cellWithMetrics.actorMetricsRecorder.map { p ⇒ + cellWithMetrics.mailboxSizeCollectorCancellable.cancel() + Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity) + } + } + + @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") + def actorInvokeFailure(cell: ActorCell): Unit = {} + + @Before("actorInvokeFailure(cell)") + def beforeInvokeFailure(cell: ActorCell): Unit = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + + cellWithMetrics.actorMetricsRecorder.map { + am ⇒ am.errors.increment() + } + } +} + +trait ActorCellMetrics { + var metricIdentity: ActorMetrics = _ + var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ + var mailboxSizeCollectorCancellable: Cancellable = _ +} + +@Aspect +class ActorCellMetricsIntoActorCellMixin { + + @DeclareMixin("akka.actor.ActorCell") + def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} +} + +@Aspect +class TraceContextIntoEnvelopeMixin { + + @DeclareMixin("akka.dispatch.Envelope") + def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") + def envelopeCreation(ctx: TraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala new file mode 100644 index 00000000..ee9d442f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala @@ -0,0 +1,47 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.instrumentation + +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorLoggingInstrumentation { + + @DeclareMixin("akka.event.Logging.LogEvent+") + def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.event.Logging.LogEvent+.new(..)) && this(event)") + def logEventCreation(event: TraceContextAware): Unit = {} + + @After("logEventCreation(event)") + def captureTraceContext(event: TraceContextAware): Unit = { + // Force initialization of TraceContextAware + event.traceContext + } + + @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") + def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {} + + @Around("withMdcInvocation(logSource, logEvent, logStatement)") + def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { + TraceRecorder.withTraceContext(logEvent.traceContext) { + pjp.proceed() + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala new file mode 100644 index 00000000..9b6b6866 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala @@ -0,0 +1,80 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.instrumentation + +import akka.dispatch.sysmsg.EarliestFirstSystemMessageList +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class ActorSystemMessageInstrumentation { + + @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") + def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} + + @Around("systemMessageProcessing(messages)") + def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { + if (messages.nonEmpty) { + val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext + TraceRecorder.withTraceContext(ctx)(pjp.proceed()) + + } else pjp.proceed() + } +} + +@Aspect +class TraceContextIntoSystemMessageMixin { + + @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") + def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") + def envelopeCreation(ctx: TraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } +} + +@Aspect +class TraceContextIntoRepointableActorRefMixin { + + @DeclareMixin("akka.actor.RepointableActorRef") + def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") + def envelopeCreation(ctx: TraceContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } + + @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") + def repointableActorRefCreation(repointableActorRef: TraceContextAware): Unit = {} + + @Around("repointableActorRefCreation(repointableActorRef)") + def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { + TraceRecorder.withTraceContext(repointableActorRef.traceContext) { + pjp.proceed() + } + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala new file mode 100644 index 00000000..3bf13ce2 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala @@ -0,0 +1,55 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.instrumentation + +import akka.actor.ActorRefProvider +import akka.event.Logging.Warning +import akka.pattern.{ AskTimeoutException, PromiseActorRef } +import kamon.Kamon +import kamon.trace.Trace +import org.aspectj.lang.annotation.{ AfterReturning, Aspect, Pointcut } + +import scala.compat.Platform.EOL + +@Aspect +class AskPatternInstrumentation { + + class StackTraceCaptureException extends Throwable + + @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *, *)", argNames = "provider") + def promiseActorRefApply(provider: ActorRefProvider): Unit = {} + + @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor") + def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = { + val system = promiseActor.provider.guardian.underlying.system + val traceExtension = Kamon(Trace)(system) + + if (traceExtension.enableAskPatternTracing) { + val future = promiseActor.result.future + implicit val ec = system.dispatcher + val stack = new StackTraceCaptureException + + future onFailure { + case timeout: AskTimeoutException ⇒ + val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL) + + system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], + "Timeout triggered for ask pattern registered at: " + stackString)) + } + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala new file mode 100644 index 00000000..db366e8c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala @@ -0,0 +1,163 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.instrumentation + +import java.lang.reflect.Method +import java.util.concurrent.ThreadPoolExecutor + +import akka.actor.{ ActorSystemImpl, Cancellable } +import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher } +import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement +import kamon.Kamon +import kamon.metric.DispatcherMetrics.DispatcherMetricRecorder +import kamon.metric.{ DispatcherMetrics, Metrics } +import org.aspectj.lang.annotation._ + +import scala.concurrent.forkjoin.ForkJoinPool + +@Aspect +class DispatcherInstrumentation { + + @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") + def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} + + @Before("onActorSystemStartup(dispatchers, system)") + def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = { + val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem] + currentDispatchers.actorSystem = system + } + + @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)") + def onDispatchersLookup(dispatchers: Dispatchers) = {} + + @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") + def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { + val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] + + dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem + } + + @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))") + def onCreateExecutorService(): Unit = {} + + @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))") + def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {} + + @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)") + def onDispatcherStartup(dispatcher: Dispatcher): Unit = {} + + @After("onDispatcherStartup(dispatcher)") + def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { + + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] + val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) + val metricIdentity = DispatcherMetrics(dispatcher.id) + + dispatcherWithMetrics.metricIdentity = metricIdentity + dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory) + + if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) { + dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder { + dispatcherWithMetrics.dispatcherMetricsRecorder.map { + dm ⇒ + val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = + DispatcherMetricsCollector.collect(dispatcher) + + dm.maximumPoolSize.record(maximumPoolSize) + dm.runningThreadCount.record(runningThreadCount) + dm.queueTaskCount.record(queueTaskCount) + dm.poolSize.record(poolSize) + } + } + } + } + + @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)") + def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {} + + @After("onDispatcherShutdown(dispatcher)") + def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] + + dispatcherWithMetrics.dispatcherMetricsRecorder.map { + dispatcher ⇒ + dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() + Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity) + } + } +} + +@Aspect +class DispatcherMetricCollectionInfoIntoDispatcherMixin { + + @DeclareMixin("akka.dispatch.Dispatcher") + def mixinDispatcherMetricsToMessageDispatcher: DispatcherMetricCollectionInfo = new DispatcherMetricCollectionInfo {} + + @DeclareMixin("akka.dispatch.Dispatchers") + def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} +} + +trait DispatcherMetricCollectionInfo { + var metricIdentity: DispatcherMetrics = _ + var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ + var dispatcherCollectorCancellable: Cancellable = _ + var actorSystem: ActorSystemImpl = _ +} + +trait DispatchersWithActorSystem { + var actorSystem: ActorSystemImpl = _ +} + +object DispatcherMetricsCollector { + + case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long) + + private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = { + DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, + (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) + } + + private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { + DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) + } + + private val executorServiceMethod: Method = { + // executorService is protected + val method = classOf[Dispatcher].getDeclaredMethod("executorService") + method.setAccessible(true) + method + } + + def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = { + dispatcher match { + case x: Dispatcher ⇒ { + val executor = executorServiceMethod.invoke(x) match { + case delegate: ExecutorServiceDelegate ⇒ delegate.executor + case other ⇒ other + } + + executor match { + case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) + case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) + case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) + } + } + case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala b/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala new file mode 100644 index 00000000..e79090a8 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala @@ -0,0 +1,35 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package org.HdrHistogram + +import java.util.concurrent.atomic.{ AtomicLongArray, AtomicLongFieldUpdater } + +trait AtomicHistogramFieldsAccessor { + self: AtomicHistogram ⇒ + + def countsArray(): AtomicLongArray = self.counts + + def unitMagnitude(): Int = self.unitMagnitude + + def subBucketHalfCount(): Int = self.subBucketHalfCount + + def subBucketHalfCountMagnitude(): Int = self.subBucketHalfCountMagnitude +} + +object AtomicHistogramFieldsAccessor { + def totalCountUpdater(): AtomicLongFieldUpdater[AtomicHistogram] = AtomicHistogram.totalCountUpdater +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala new file mode 100644 index 00000000..d8f2b620 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala @@ -0,0 +1,48 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.instrumentation.scala + +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class FutureInstrumentation { + + @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") + def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default + + @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)") + def futureRelatedRunnableCreation(runnable: TraceContextAware): Unit = {} + + @After("futureRelatedRunnableCreation(runnable)") + def afterCreation(runnable: TraceContextAware): Unit = { + // Force traceContext initialization. + runnable.traceContext + } + + @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)") + def futureRelatedRunnableExecution(runnable: TraceContextAware) = {} + + @Around("futureRelatedRunnableExecution(runnable)") + def aroundExecution(pjp: ProceedingJoinPoint, runnable: TraceContextAware): Any = { + TraceRecorder.withTraceContext(runnable.traceContext) { + pjp.proceed() + } + } + +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala new file mode 100644 index 00000000..bb412f79 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala @@ -0,0 +1,89 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric.instrument.{ MinMaxCounter, Counter, Histogram } + +case class ActorMetrics(name: String) extends MetricGroupIdentity { + val category = ActorMetrics +} + +object ActorMetrics extends MetricGroupCategory { + val name = "actor" + + case object ProcessingTime extends MetricIdentity { val name = "processing-time" } + case object MailboxSize extends MetricIdentity { val name = "mailbox-size" } + case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } + case object Errors extends MetricIdentity { val name = "errors" } + + case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter, + errors: Counter) extends MetricGroupRecorder { + + def collect(context: CollectionContext): ActorMetricSnapshot = + ActorMetricSnapshot( + processingTime.collect(context), + timeInMailbox.collect(context), + mailboxSize.collect(context), + errors.collect(context)) + + def cleanup: Unit = { + processingTime.cleanup + mailboxSize.cleanup + timeInMailbox.cleanup + errors.cleanup + } + } + + case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, + mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = ActorMetricSnapshot + + def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot = + ActorMetricSnapshot( + processingTime.merge(that.processingTime, context), + timeInMailbox.merge(that.timeInMailbox, context), + mailboxSize.merge(that.mailboxSize, context), + errors.merge(that.errors, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + (ProcessingTime -> processingTime), + (MailboxSize -> mailboxSize), + (TimeInMailbox -> timeInMailbox), + (Errors -> errors)) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = ActorMetricsRecorder + + def create(config: Config, system: ActorSystem): ActorMetricsRecorder = { + val settings = config.getConfig("precision.actor") + + val processingTimeConfig = settings.getConfig("processing-time") + val timeInMailboxConfig = settings.getConfig("time-in-mailbox") + val mailboxSizeConfig = settings.getConfig("mailbox-size") + + new ActorMetricsRecorder( + Histogram.fromConfig(processingTimeConfig), + Histogram.fromConfig(timeInMailboxConfig), + MinMaxCounter.fromConfig(mailboxSizeConfig, system), + Counter()) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala new file mode 100644 index 00000000..fbce783c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala @@ -0,0 +1,88 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric.instrument.{ Histogram, HdrHistogram } + +case class DispatcherMetrics(name: String) extends MetricGroupIdentity { + val category = DispatcherMetrics +} + +object DispatcherMetrics extends MetricGroupCategory { + val name = "dispatcher" + + case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" } + case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" } + case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" } + case object PoolSize extends MetricIdentity { val name = "pool-size" } + + case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram, + queueTaskCount: Histogram, poolSize: Histogram) + extends MetricGroupRecorder { + + def collect(context: CollectionContext): MetricGroupSnapshot = + DispatcherMetricSnapshot( + maximumPoolSize.collect(context), + runningThreadCount.collect(context), + queueTaskCount.collect(context), + poolSize.collect(context)) + + def cleanup: Unit = {} + + } + + case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot, + queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = DispatcherMetricSnapshot + + def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot = + DispatcherMetricSnapshot( + maximumPoolSize.merge(that.maximumPoolSize, context), + runningThreadCount.merge(that.runningThreadCount, context), + queueTaskCount.merge(that.queueTaskCount, context), + poolSize.merge(that.poolSize, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + (MaximumPoolSize -> maximumPoolSize), + (RunningThreadCount -> runningThreadCount), + (QueueTaskCount -> queueTaskCount), + (PoolSize -> poolSize)) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = DispatcherMetricRecorder + + def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = { + val settings = config.getConfig("precision.dispatcher") + + val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size") + val runningThreadCountConfig = settings.getConfig("running-thread-count") + val queueTaskCountConfig = settings.getConfig("queued-task-count") + val poolSizeConfig = settings.getConfig("pool-size") + + new DispatcherMetricRecorder( + Histogram.fromConfig(maximumPoolSizeConfig), + Histogram.fromConfig(runningThreadCountConfig), + Histogram.fromConfig(queueTaskCountConfig), + Histogram.fromConfig(poolSizeConfig)) + } + } +} + diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala new file mode 100644 index 00000000..325dd216 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala @@ -0,0 +1,75 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import java.nio.{ LongBuffer } +import akka.actor.ActorSystem +import com.typesafe.config.Config + +trait MetricGroupCategory { + def name: String +} + +trait MetricGroupIdentity { + def name: String + def category: MetricGroupCategory +} + +trait MetricIdentity { + def name: String +} + +trait CollectionContext { + def buffer: LongBuffer +} + +object CollectionContext { + def default: CollectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) + } +} + +trait MetricGroupRecorder { + def collect(context: CollectionContext): MetricGroupSnapshot + def cleanup: Unit +} + +trait MetricSnapshot { + type SnapshotType + + def merge(that: SnapshotType, context: CollectionContext): SnapshotType +} + +trait MetricGroupSnapshot { + type GroupSnapshotType + + def metrics: Map[MetricIdentity, MetricSnapshot] + def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType +} + +private[kamon] trait MetricRecorder { + type SnapshotType <: MetricSnapshot + + def collect(context: CollectionContext): SnapshotType + def cleanup: Unit +} + +trait MetricGroupFactory { + type GroupRecorder <: MetricGroupRecorder + def create(config: Config, system: ActorSystem): GroupRecorder +} + diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala new file mode 100644 index 00000000..1025f0de --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -0,0 +1,110 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import java.nio.{ LongBuffer, ByteBuffer } + +import scala.collection.concurrent.TrieMap +import akka.actor._ +import com.typesafe.config.Config +import kamon.util.GlobPathFilter +import kamon.Kamon +import akka.actor +import kamon.metric.Metrics.MetricGroupFilter +import kamon.metric.Subscriptions.Subscribe +import java.util.concurrent.TimeUnit + +class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") + + /** Configured Dispatchers */ + val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions")) + val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings")) + + /** Configuration Settings */ + val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS) + + val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() + val filters = loadFilters(metricsExtConfig) + lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") + + def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { + if (shouldTrack(identity)) + Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) + else + None + } + + def unregister(identity: MetricGroupIdentity): Unit = { + storage.remove(identity) + } + + def subscribe[C <: MetricGroupCategory](category: C, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = { + subscriptions.tell(Subscribe(category, selection, permanently), receiver) + } + + def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = { + // TODO: Improve the way in which we are getting the context. + val context = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(50000) + } + (for ((identity, recorder) ← storage) yield (identity, recorder.collect(context))).toMap + } + + def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { + import scala.concurrent.duration._ + + system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) { + body + }(gaugeRecordingsDispatcher) + } + + private def shouldTrack(identity: MetricGroupIdentity): Boolean = { + filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true) + } + + def loadFilters(config: Config): Map[String, MetricGroupFilter] = { + import scala.collection.JavaConverters._ + + val filters = config.getObjectList("filters").asScala + + val allFilters = + for ( + filter ← filters; + entry ← filter.entrySet().asScala + ) yield { + val key = entry.getKey + val keyBasedConfig = entry.getValue.atKey(key) + + val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList + val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList + + (key, MetricGroupFilter(includes, excludes)) + } + + allFilters.toMap + } +} + +object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system) + + case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { + def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/Scale.scala b/kamon-core/src/main/scala/kamon/metric/Scale.scala new file mode 100644 index 00000000..2f27c1a3 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Scale.scala @@ -0,0 +1,31 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +class Scale(val numericValue: Double) extends AnyVal + +object Scale { + val Nano = new Scale(1E-9) + val Micro = new Scale(1E-6) + val Milli = new Scale(1E-3) + val Unit = new Scale(1) + val Kilo = new Scale(1E3) + val Mega = new Scale(1E6) + val Giga = new Scale(1E9) + + def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue +} diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala new file mode 100644 index 00000000..a9f4c721 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala @@ -0,0 +1,128 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.{ Props, ActorRef, Actor } +import kamon.metric.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } +import kamon.util.GlobPathFilter +import scala.concurrent.duration.{ FiniteDuration, Duration } +import java.util.concurrent.TimeUnit +import kamon.Kamon +import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer + +class Subscriptions extends Actor { + import context.system + + val config = context.system.settings.config + val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS) + val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + + var lastTick: Long = System.currentTimeMillis() + var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + + def receive = { + case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent) + case FlushMetrics ⇒ flush() + } + + def subscribe(category: MetricGroupCategory, selection: String, permanent: Boolean): Unit = { + val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) + if (permanent) { + val receivers = subscribedPermanently.get(filter).getOrElse(Nil) + subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) + + } else { + val receivers = subscribedForOneShot.get(filter).getOrElse(Nil) + subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers) + } + + } + + def flush(): Unit = { + val currentTick = System.currentTimeMillis() + val snapshots = Kamon(Metrics).collect + + dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) + dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) + + lastTick = currentTick + subscribedForOneShot = Map.empty + } + + def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], + snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { + + for ((filter, receivers) ← subscriptions) yield { + val selection = snapshots.filter(group ⇒ filter.accept(group._1)) + val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) + + receivers.foreach(_ ! tickMetrics) + } + } +} + +object Subscriptions { + case object FlushMetrics + case class Subscribe(category: MetricGroupCategory, selection: String, permanently: Boolean = false) + case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) + + case class MetricGroupFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) { + def accept(identity: MetricGroupIdentity): Boolean = { + category.equals(identity.category) && globFilter.accept(identity.name) + } + } +} + +class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { + val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) + val collectionContext = CollectionContext.default + + def receive = empty + + def empty: Actor.Receive = { + case tick: TickMetricSnapshot ⇒ context become (buffering(tick)) + case FlushBuffer ⇒ // Nothing to flush. + } + + def buffering(buffered: TickMetricSnapshot): Actor.Receive = { + case TickMetricSnapshot(_, to, tickMetrics) ⇒ + val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) + val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) + + context become (buffering(combinedSnapshot)) + + case FlushBuffer ⇒ + receiver ! buffered + context become (empty) + + } + + override def postStop(): Unit = { + flushSchedule.cancel() + super.postStop() + } + + def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = left.merge(right.asInstanceOf[left.GroupSnapshotType], collectionContext).asInstanceOf[MetricGroupSnapshot] // ??? //Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r, collectionContext))) +} + +object TickMetricSnapshotBuffer { + case object FlushBuffer + + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = + Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) +} diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala new file mode 100644 index 00000000..1ee1eab4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -0,0 +1,77 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.ActorSystem +import kamon.metric.instrument.{ Histogram } + +import scala.collection.concurrent.TrieMap +import com.typesafe.config.Config + +case class TraceMetrics(name: String) extends MetricGroupIdentity { + val category = TraceMetrics +} + +object TraceMetrics extends MetricGroupCategory { + val name = "trace" + + case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" } + case class HttpClientRequest(name: String) extends MetricIdentity + + case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram) + extends MetricGroupRecorder { + + private val segments = TrieMap[MetricIdentity, Histogram]() + + def segmentRecorder(segmentIdentity: MetricIdentity): Histogram = + segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) + + def collect(context: CollectionContext): TraceMetricsSnapshot = + TraceMetricsSnapshot( + elapsedTime.collect(context), + segments.map { case (identity, recorder) ⇒ (identity, recorder.collect(context)) }.toMap) + + def cleanup: Unit = {} + } + + case class TraceMetricsSnapshot(elapsedTime: Histogram.Snapshot, segments: Map[MetricIdentity, Histogram.Snapshot]) + extends MetricGroupSnapshot { + + type GroupSnapshotType = TraceMetricsSnapshot + + def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot = + TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty) + + def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = TraceMetricRecorder + + def create(config: Config, system: ActorSystem): TraceMetricRecorder = { + + val settings = config.getConfig("precision.trace") + val elapsedTimeConfig = settings.getConfig("elapsed-time") + val segmentConfig = settings.getConfig("segment") + + new TraceMetricRecorder( + Histogram.fromConfig(elapsedTimeConfig), + () ⇒ Histogram.fromConfig(segmentConfig)) + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala new file mode 100644 index 00000000..dea03968 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala @@ -0,0 +1,139 @@ +package kamon.metric + +import akka.actor +import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionIdProvider, ExtensionId } +import com.typesafe.config.Config +import kamon.Kamon +import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram } + +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.FiniteDuration + +class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + lazy val userMetricsRecorder = Kamon(Metrics)(system).register(UserMetrics, UserMetrics.Factory).get + + def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = + userMetricsRecorder.buildHistogram(name, precision, highestTrackableValue) + + def registerHistogram(name: String): Histogram = + userMetricsRecorder.buildHistogram(name) + + def registerCounter(name: String): Counter = + userMetricsRecorder.buildCounter(name) + + def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration): MinMaxCounter = { + userMetricsRecorder.buildMinMaxCounter(name, precision, highestTrackableValue, refreshInterval) + } + + def registerMinMaxCounter(name: String): MinMaxCounter = + userMetricsRecorder.buildMinMaxCounter(name) + + def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = + userMetricsRecorder.buildGauge(name)(currentValueCollector) + + def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = + userMetricsRecorder.buildGauge(name, precision, highestTrackableValue, refreshInterval, currentValueCollector) +} + +object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider with MetricGroupIdentity { + def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + def createExtension(system: ExtendedActorSystem): UserMetricsExtension = new UserMetricsExtension(system) + + val name: String = "user-metrics-recorder" + val category = new MetricGroupCategory { + val name: String = "user-metrics" + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = UserMetricsRecorder + def create(config: Config, system: ActorSystem): UserMetricsRecorder = new UserMetricsRecorder(system) + } + + class UserMetricsRecorder(system: ActorSystem) extends MetricGroupRecorder { + val precisionConfig = system.settings.config.getConfig("kamon.metrics.precision") + val defaultHistogramPrecisionConfig = precisionConfig.getConfig("default-histogram-precision") + val defaultMinMaxCounterPrecisionConfig = precisionConfig.getConfig("default-min-max-counter-precision") + val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision") + + val histograms = TrieMap[String, Histogram]() + val counters = TrieMap[String, Counter]() + val minMaxCounters = TrieMap[String, MinMaxCounter]() + val gauges = TrieMap[String, Gauge]() + + def buildHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = + histograms.getOrElseUpdate(name, Histogram(highestTrackableValue, precision, Scale.Unit)) + + def buildHistogram(name: String): Histogram = + histograms.getOrElseUpdate(name, Histogram.fromConfig(defaultHistogramPrecisionConfig)) + + def buildCounter(name: String): Counter = + counters.getOrElseUpdate(name, Counter()) + + def buildMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration): MinMaxCounter = { + minMaxCounters.getOrElseUpdate(name, MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system)) + } + + def buildMinMaxCounter(name: String): MinMaxCounter = + minMaxCounters.getOrElseUpdate(name, MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system)) + + def buildGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration, currentValueCollector: Gauge.CurrentValueCollector): Gauge = + gauges.getOrElseUpdate(name, Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector)) + + def buildGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = + gauges.getOrElseUpdate(name, Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector)) + + def collect(context: CollectionContext): UserMetricsSnapshot = { + val histogramSnapshots = histograms.map { + case (name, histogram) ⇒ + (UserHistogram(name), histogram.collect(context)) + } toMap + + val counterSnapshots = counters.map { + case (name, counter) ⇒ + (UserCounter(name), counter.collect(context)) + } toMap + + val minMaxCounterSnapshots = minMaxCounters.map { + case (name, minMaxCounter) ⇒ + (UserMinMaxCounter(name), minMaxCounter.collect(context)) + } toMap + + val gaugeSnapshots = gauges.map { + case (name, gauge) ⇒ + (UserGauge(name), gauge.collect(context)) + } toMap + + UserMetricsSnapshot(histogramSnapshots, counterSnapshots, minMaxCounterSnapshots, gaugeSnapshots) + } + + def cleanup: Unit = {} + } + + case class UserHistogram(name: String) extends MetricIdentity + case class UserCounter(name: String) extends MetricIdentity + case class UserMinMaxCounter(name: String) extends MetricIdentity + case class UserGauge(name: String) extends MetricIdentity + + case class UserMetricsSnapshot(histograms: Map[UserHistogram, Histogram.Snapshot], + counters: Map[UserCounter, Counter.Snapshot], + minMaxCounters: Map[UserMinMaxCounter, Histogram.Snapshot], + gauges: Map[UserGauge, Histogram.Snapshot]) + extends MetricGroupSnapshot { + + type GroupSnapshotType = UserMetricsSnapshot + + def merge(that: UserMetricsSnapshot, context: CollectionContext): UserMetricsSnapshot = + UserMetricsSnapshot( + combineMaps(histograms, that.histograms)((l, r) ⇒ l.merge(r, context)), + combineMaps(counters, that.counters)((l, r) ⇒ l.merge(r, context)), + combineMaps(minMaxCounters, that.minMaxCounters)((l, r) ⇒ l.merge(r, context)), + combineMaps(gauges, that.gauges)((l, r) ⇒ l.merge(r, context))) + + def metrics: Map[MetricIdentity, MetricSnapshot] = histograms ++ counters ++ minMaxCounters ++ gauges + } + +} diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala new file mode 100644 index 00000000..b592bcd3 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala @@ -0,0 +1,59 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +import jsr166e.LongAdder +import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder } + +trait Counter extends MetricRecorder { + type SnapshotType = Counter.Snapshot + + def increment(): Unit + def increment(times: Long): Unit +} + +object Counter { + + def apply(): Counter = new LongAdderCounter + + trait Snapshot extends MetricSnapshot { + type SnapshotType = Counter.Snapshot + + def count: Long + def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot + } +} + +class LongAdderCounter extends Counter { + private val counter = new LongAdder + + def increment(): Unit = counter.increment() + + def increment(times: Long): Unit = { + if (times < 0) + throw new UnsupportedOperationException("Counters cannot be decremented") + counter.add(times) + } + + def collect(context: CollectionContext): Counter.Snapshot = CounterSnapshot(counter.sumThenReset()) + + def cleanup: Unit = {} +} + +case class CounterSnapshot(count: Long) extends Counter.Snapshot { + def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala new file mode 100644 index 00000000..1efff2bc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala @@ -0,0 +1,78 @@ +package kamon.metric.instrument + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ Cancellable, ActorSystem } +import com.typesafe.config.Config +import kamon.metric.{ CollectionContext, Scale, MetricRecorder } + +import scala.concurrent.duration.FiniteDuration + +trait Gauge extends MetricRecorder { + type SnapshotType = Histogram.Snapshot + + def record(value: Long) + def record(value: Long, count: Long) +} + +object Gauge { + + trait CurrentValueCollector { + def currentValue: Long + } + + def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration, + system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { + + val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) + val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector) + + val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + gauge.refreshValue() + }(system.dispatcher) // TODO: Move this to Kamon dispatchers + + gauge.refreshValuesSchedule.set(refreshValuesSchedule) + gauge + } + + def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge = + fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction)) + + def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = { + val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision") + fromConfig(config, system)(currentValueCollector) + } + + def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { + import scala.concurrent.duration._ + + val highest = config.getLong("highest-trackable-value") + val significantDigits = config.getInt("significant-value-digits") + val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) + + Gauge(Histogram.Precision(significantDigits), highest, Scale.Unit, refreshInterval.millis, system)(currentValueCollector) + } + + implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { + def currentValue: Long = f.apply() + } +} + +class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge { + val refreshValuesSchedule = new AtomicReference[Cancellable]() + + def record(value: Long): Unit = underlyingHistogram.record(value) + + def record(value: Long, count: Long): Unit = underlyingHistogram.record(value, count) + + def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context) + + def cleanup: Unit = { + if (refreshValuesSchedule.get() != null) + refreshValuesSchedule.get().cancel() + } + + def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue) +} + diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala new file mode 100644 index 00000000..9ae077f4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -0,0 +1,246 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +import java.nio.LongBuffer +import com.typesafe.config.Config +import org.HdrHistogram.AtomicHistogramFieldsAccessor +import org.HdrHistogram.AtomicHistogram +import kamon.metric._ + +trait Histogram extends MetricRecorder { + type SnapshotType = Histogram.Snapshot + + def record(value: Long) + def record(value: Long, count: Long) +} + +object Histogram { + + def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram = + new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale) + + def fromConfig(config: Config): Histogram = { + val highest = config.getLong("highest-trackable-value") + val significantDigits = config.getInt("significant-value-digits") + + new HdrHistogram(1L, highest, significantDigits) + } + + object HighestTrackableValue { + val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L + } + + case class Precision(significantDigits: Int) + object Precision { + val Low = Precision(1) + val Normal = Precision(2) + val Fine = Precision(3) + } + + trait Record { + def level: Long + def count: Long + + private[kamon] def rawCompactRecord: Long + } + + case class MutableRecord(var level: Long, var count: Long) extends Record { + var rawCompactRecord: Long = 0L + } + + trait Snapshot extends MetricSnapshot { + type SnapshotType = Histogram.Snapshot + + def isEmpty: Boolean = numberOfMeasurements == 0 + def scale: Scale + def numberOfMeasurements: Long + def min: Long + def max: Long + def recordsIterator: Iterator[Record] + def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot + } +} + +/** + * This implementation is meant to be used for real time data collection where data snapshots are taken often over time. + * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still + * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. + */ +class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit) + extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits) + with Histogram with AtomicHistogramFieldsAccessor { + + import AtomicHistogramFieldsAccessor.totalCountUpdater + + def record(value: Long): Unit = recordValue(value) + + def record(value: Long, count: Long): Unit = recordValueWithCount(value, count) + + def collect(context: CollectionContext): Histogram.Snapshot = { + import context.buffer + buffer.clear() + val nrOfMeasurements = writeSnapshotTo(buffer) + + buffer.flip() + + val measurementsArray = Array.ofDim[Long](buffer.limit()) + buffer.get(measurementsArray, 0, measurementsArray.length) + new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) + } + + def cleanup: Unit = {} + + private def writeSnapshotTo(buffer: LongBuffer): Long = { + val counts = countsArray() + val countsLength = counts.length() + + var nrOfMeasurements = 0L + var index = 0L + while (index < countsLength) { + val countAtIndex = counts.getAndSet(index.toInt, 0L) + + if (countAtIndex > 0) { + buffer.put(CompactHdrSnapshot.compactRecord(index, countAtIndex)) + nrOfMeasurements += countAtIndex + } + + index += 1 + } + + reestablishTotalCount(nrOfMeasurements) + nrOfMeasurements + } + + private def reestablishTotalCount(diff: Long): Unit = { + def tryUpdateTotalCount: Boolean = { + val previousTotalCount = getTotalCount + val newTotalCount = previousTotalCount - diff + + totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) + } + + while (!tryUpdateTotalCount) {} + } + +} + +class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, + subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot { + + def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0)) + def max: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(compactRecords.length - 1)) + + def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = { + if (that.isEmpty) this else if (this.isEmpty) that else { + import context.buffer + buffer.clear() + + val selfIterator = recordsIterator + val thatIterator = that.recordsIterator + var thatCurrentRecord: Histogram.Record = null + var mergedNumberOfMeasurements = 0L + + def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null + def addToBuffer(compactRecord: Long): Unit = { + mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) + buffer.put(compactRecord) + } + + while (selfIterator.hasNext) { + val selfCurrentRecord = selfIterator.next() + + // Advance that to no further than the level of selfCurrentRecord + thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord + while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { + addToBuffer(thatCurrentRecord.rawCompactRecord) + thatCurrentRecord = nextOrNull(thatIterator) + } + + // Include the current record of self and optionally merge if has the same level as thatCurrentRecord + if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { + addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) + thatCurrentRecord = nextOrNull(thatIterator) + } else { + addToBuffer(selfCurrentRecord.rawCompactRecord) + } + } + + // Include everything that might have been left from that + if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) + while (thatIterator.hasNext) { + addToBuffer(thatIterator.next().rawCompactRecord) + } + + buffer.flip() + val compactRecords = Array.ofDim[Long](buffer.limit()) + buffer.get(compactRecords) + + new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) + } + } + + @inline private def mergeCompactRecords(left: Long, right: Long): Long = { + val index = left >> 48 + val leftCount = countFromCompactRecord(left) + val rightCount = countFromCompactRecord(right) + + CompactHdrSnapshot.compactRecord(index, leftCount + rightCount) + } + + @inline private def levelFromCompactRecord(compactRecord: Long): Long = { + val countsArrayIndex = (compactRecord >> 48).toInt + var bucketIndex: Int = (countsArrayIndex >> subBucketHalfCountMagnitude) - 1 + var subBucketIndex: Int = (countsArrayIndex & (subBucketHalfCount - 1)) + subBucketHalfCount + if (bucketIndex < 0) { + subBucketIndex -= subBucketHalfCount + bucketIndex = 0 + } + + subBucketIndex.toLong << (bucketIndex + unitMagnitude) + } + + @inline private def countFromCompactRecord(compactRecord: Long): Long = + compactRecord & CompactHdrSnapshot.CompactRecordCountMask + + def recordsIterator: Iterator[Histogram.Record] = new Iterator[Histogram.Record] { + var currentIndex = 0 + val mutableRecord = Histogram.MutableRecord(0, 0) + + override def hasNext: Boolean = currentIndex < compactRecords.length + + override def next(): Histogram.Record = { + if (hasNext) { + val measurement = compactRecords(currentIndex) + mutableRecord.rawCompactRecord = measurement + mutableRecord.level = levelFromCompactRecord(measurement) + mutableRecord.count = countFromCompactRecord(measurement) + currentIndex += 1 + + mutableRecord + } else { + throw new IllegalStateException("The iterator has already been consumed.") + } + } + } +} + +object CompactHdrSnapshot { + val CompactRecordCountMask = 0xFFFFFFFFFFFFL + + def compactRecord(index: Long, count: Long): Long = (index << 48) | count +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala new file mode 100644 index 00000000..471e7bd4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala @@ -0,0 +1,116 @@ +package kamon.metric.instrument + +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +import java.lang.Math.abs +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference +import akka.actor.{ ActorSystem, Cancellable } +import com.typesafe.config.Config +import jsr166e.LongMaxUpdater +import kamon.metric.{ Scale, MetricRecorder, CollectionContext } +import kamon.util.PaddedAtomicLong +import scala.concurrent.duration.FiniteDuration + +trait MinMaxCounter extends MetricRecorder { + override type SnapshotType = Histogram.Snapshot + + def increment(): Unit + def increment(times: Long): Unit + def decrement() + def decrement(times: Long) +} + +object MinMaxCounter { + + def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration, + system: ActorSystem): MinMaxCounter = { + + val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) + val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram) + + val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + minMaxCounter.refreshValues() + }(system.dispatcher) // TODO: Move this to Kamon dispatchers + + minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule) + minMaxCounter + } + + def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = { + import scala.concurrent.duration._ + + val highest = config.getLong("highest-trackable-value") + val significantDigits = config.getInt("significant-value-digits") + val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) + + apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system) + } +} + +class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter { + private val min = new LongMaxUpdater + private val max = new LongMaxUpdater + private val sum = new PaddedAtomicLong + val refreshValuesSchedule = new AtomicReference[Cancellable]() + + min.update(0L) + max.update(0L) + + def increment(): Unit = increment(1L) + + def increment(times: Long): Unit = { + val currentValue = sum.addAndGet(times) + max.update(currentValue) + } + + def decrement(): Unit = decrement(1L) + + def decrement(times: Long): Unit = { + val currentValue = sum.addAndGet(-times) + min.update(-currentValue) + } + + def collect(context: CollectionContext): Histogram.Snapshot = { + refreshValues() + underlyingHistogram.collect(context) + } + + def cleanup: Unit = { + if (refreshValuesSchedule.get() != null) + refreshValuesSchedule.get().cancel() + } + + def refreshValues(): Unit = { + val currentValue = { + val value = sum.get() + if (value < 0) 0 else value + } + + val currentMin = { + val minAbs = abs(min.maxThenReset()) + if (minAbs <= currentValue) minAbs else 0 + } + + underlyingHistogram.record(currentValue) + underlyingHistogram.record(currentMin) + underlyingHistogram.record(max.maxThenReset()) + + max.update(currentValue) + min.update(-currentValue) + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/package.scala b/kamon-core/src/main/scala/kamon/metric/package.scala new file mode 100644 index 00000000..43166058 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/package.scala @@ -0,0 +1,34 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon + +import scala.annotation.tailrec +import com.typesafe.config.Config + +package object metric { + + @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { + if (right.isEmpty) + left + else { + val (key, rightValue) = right.head + val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue) + + combineMaps(left.updated(key, value), right.tail)(valueMerger) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala deleted file mode 100644 index 9e19dced..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import com.typesafe.config.Config -import kamon.metrics.instruments.CounterRecorder -import org.HdrHistogram.HdrRecorder - -case class ActorMetrics(name: String) extends MetricGroupIdentity { - val category = ActorMetrics -} - -object ActorMetrics extends MetricGroupCategory { - val name = "actor" - - case object ProcessingTime extends MetricIdentity { val name, tag = "processing-time" } - case object MailboxSize extends MetricIdentity { val name, tag = "mailbox-size" } - case object TimeInMailbox extends MetricIdentity { val name, tag = "time-in-mailbox" } - case object ErrorCounter extends MetricIdentity { val name, tag = "errors" } - - case class ActorMetricRecorder(processingTime: MetricRecorder, mailboxSize: MetricRecorder, timeInMailbox: MetricRecorder, errorCounter: MetricRecorder) - extends MetricGroupRecorder { - - def collect: MetricGroupSnapshot = { - ActorMetricSnapshot(processingTime.collect(), mailboxSize.collect(), timeInMailbox.collect(), errorCounter.collect()) - } - } - - case class ActorMetricSnapshot(processingTime: MetricSnapshotLike, mailboxSize: MetricSnapshotLike, timeInMailbox: MetricSnapshotLike, errorCounter: MetricSnapshotLike) - extends MetricGroupSnapshot { - - val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map( - (ProcessingTime -> processingTime), - (MailboxSize -> mailboxSize), - (TimeInMailbox -> timeInMailbox), - (ErrorCounter -> errorCounter)) - } - - val Factory = new MetricGroupFactory { - type GroupRecorder = ActorMetricRecorder - - def create(config: Config): ActorMetricRecorder = { - val settings = config.getConfig("precision.actor") - - val processingTimeConfig = extractPrecisionConfig(settings.getConfig("processing-time")) - val mailboxSizeConfig = extractPrecisionConfig(settings.getConfig("mailbox-size")) - val timeInMailboxConfig = extractPrecisionConfig(settings.getConfig("time-in-mailbox")) - - new ActorMetricRecorder( - HdrRecorder(processingTimeConfig.highestTrackableValue, processingTimeConfig.significantValueDigits, Scale.Nano), - HdrRecorder(mailboxSizeConfig.highestTrackableValue, mailboxSizeConfig.significantValueDigits, Scale.Unit), - HdrRecorder(timeInMailboxConfig.highestTrackableValue, timeInMailboxConfig.significantValueDigits, Scale.Nano), - CounterRecorder()) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala b/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala deleted file mode 100644 index cd0afac1..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import kamon.metrics.instruments.ContinuousHdrRecorder -import org.HdrHistogram.HdrRecorder -import com.typesafe.config.Config - -case class CustomMetric(name: String) extends MetricGroupIdentity { - val category = CustomMetric -} - -object CustomMetric extends MetricGroupCategory { - val name = "custom-metric" - val RecordedValues = new MetricIdentity { val name, tag = "recorded-values" } - - def histogram(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale, continuous: Boolean = false) = - new MetricGroupFactory { - - type GroupRecorder = CustomMetricRecorder - - def create(config: Config): CustomMetricRecorder = { - val recorder = - if (continuous) ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale) - else HdrRecorder(highestTrackableValue, significantValueDigits, scale) - - new CustomMetricRecorder(RecordedValues, recorder) - } - } - - class CustomMetricRecorder(identity: MetricIdentity, underlyingRecorder: HdrRecorder) - extends MetricGroupRecorder { - - def record(value: Long): Unit = underlyingRecorder.record(value) - - def collect: MetricGroupSnapshot = DefaultMetricGroupSnapshot(Map((identity, underlyingRecorder.collect()))) - } -} diff --git a/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala deleted file mode 100644 index f41e0c3f..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import com.typesafe.config.Config -import org.HdrHistogram.HdrRecorder - -case class DispatcherMetrics(name: String) extends MetricGroupIdentity { - val category = DispatcherMetrics -} - -object DispatcherMetrics extends MetricGroupCategory { - val name = "dispatcher" - - case object MaximumPoolSize extends MetricIdentity { val name, tag = "maximum-pool-size" } - case object RunningThreadCount extends MetricIdentity { val name, tag = "running-thread-count" } - case object QueueTaskCount extends MetricIdentity { val name, tag = "queued-task-count" } - case object PoolSize extends MetricIdentity { val name, tag = "pool-size" } - - case class DispatcherMetricRecorder(maximumPoolSize: MetricRecorder, runningThreadCount: MetricRecorder, queueTaskCount: MetricRecorder, poolSize: MetricRecorder) - extends MetricGroupRecorder { - - def collect: MetricGroupSnapshot = { - DispatcherMetricSnapshot(maximumPoolSize.collect(), runningThreadCount.collect(), queueTaskCount.collect(), poolSize.collect()) - } - } - - case class DispatcherMetricSnapshot(maximumPoolSize: MetricSnapshotLike, runningThreadCount: MetricSnapshotLike, queueTaskCount: MetricSnapshotLike, poolSize: MetricSnapshotLike) - extends MetricGroupSnapshot { - - val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map( - (MaximumPoolSize -> maximumPoolSize), - (RunningThreadCount -> runningThreadCount), - (QueueTaskCount -> queueTaskCount), - (PoolSize -> poolSize)) - } - - val Factory = new MetricGroupFactory { - type GroupRecorder = DispatcherMetricRecorder - - def create(config: Config): DispatcherMetricRecorder = { - val settings = config.getConfig("precision.dispatcher") - - val MaximumPoolSizeConfig = extractPrecisionConfig(settings.getConfig("maximum-pool-size")) - val RunningThreadCountConfig = extractPrecisionConfig(settings.getConfig("running-thread-count")) - val QueueTaskCountConfig = extractPrecisionConfig(settings.getConfig("queued-task-count")) - val PoolSizeConfig = extractPrecisionConfig(settings.getConfig("pool-size")) - - new DispatcherMetricRecorder( - HdrRecorder(MaximumPoolSizeConfig.highestTrackableValue, MaximumPoolSizeConfig.significantValueDigits, Scale.Unit), - HdrRecorder(RunningThreadCountConfig.highestTrackableValue, RunningThreadCountConfig.significantValueDigits, Scale.Unit), - HdrRecorder(QueueTaskCountConfig.highestTrackableValue, QueueTaskCountConfig.significantValueDigits, Scale.Unit), - HdrRecorder(PoolSizeConfig.highestTrackableValue, PoolSizeConfig.significantValueDigits, Scale.Unit)) - } - } -} - diff --git a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala deleted file mode 100644 index f07bf38e..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import annotation.tailrec -import com.typesafe.config.Config -import kamon.metrics.MetricSnapshot.Measurement -import kamon.metrics.InstrumentTypes.InstrumentType - -trait MetricGroupCategory { - def name: String -} - -trait MetricGroupIdentity { - def name: String - def category: MetricGroupCategory -} - -trait MetricIdentity { - def name: String - def tag: String -} - -trait MetricGroupRecorder { - def collect: MetricGroupSnapshot -} - -trait MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshotLike] -} - -case class DefaultMetricGroupSnapshot(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot - -trait MetricRecorder { - def record(value: Long) - def collect(): MetricSnapshotLike -} - -object InstrumentTypes { - sealed trait InstrumentType - case object Histogram extends InstrumentType - case object Gauge extends InstrumentType - case object Counter extends InstrumentType -} - -trait MetricSnapshotLike { - def instrumentType: InstrumentType - def numberOfMeasurements: Long - def scale: Scale - def measurements: Vector[Measurement] - - def max: Long = measurements.lastOption.map(_.value).getOrElse(0) - def min: Long = measurements.headOption.map(_.value).getOrElse(0) - - def merge(that: MetricSnapshotLike): MetricSnapshotLike = { - val mergedMeasurements = Vector.newBuilder[Measurement] - - @tailrec def go(left: Vector[Measurement], right: Vector[Measurement], totalNrOfMeasurements: Long): Long = { - if (left.nonEmpty && right.nonEmpty) { - val leftValue = left.head - val rightValue = right.head - - if (rightValue.value == leftValue.value) { - val merged = rightValue.merge(leftValue) - mergedMeasurements += merged - go(left.tail, right.tail, totalNrOfMeasurements + merged.count) - } else { - if (leftValue.value < rightValue.value) { - mergedMeasurements += leftValue - go(left.tail, right, totalNrOfMeasurements + leftValue.count) - } else { - mergedMeasurements += rightValue - go(left, right.tail, totalNrOfMeasurements + rightValue.count) - } - } - } else { - if (left.isEmpty && right.nonEmpty) { - mergedMeasurements += right.head - go(left, right.tail, totalNrOfMeasurements + right.head.count) - } else { - if (left.nonEmpty && right.isEmpty) { - mergedMeasurements += left.head - go(left.tail, right, totalNrOfMeasurements + left.head.count) - } else totalNrOfMeasurements - } - } - } - - val totalNrOfMeasurements = go(measurements, that.measurements, 0) - MetricSnapshot(instrumentType, totalNrOfMeasurements, scale, mergedMeasurements.result()) - } -} - -case class MetricSnapshot(instrumentType: InstrumentType, numberOfMeasurements: Long, scale: Scale, - measurements: Vector[MetricSnapshot.Measurement]) extends MetricSnapshotLike - -object MetricSnapshot { - case class Measurement(value: Long, count: Long) { - def merge(that: Measurement) = Measurement(value, count + that.count) - } -} - -trait MetricGroupFactory { - type GroupRecorder <: MetricGroupRecorder - def create(config: Config): GroupRecorder -} - diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala deleted file mode 100644 index c60babb2..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import scala.collection.concurrent.TrieMap -import akka.actor._ -import com.typesafe.config.Config -import kamon.util.GlobPathFilter -import kamon.Kamon -import akka.actor -import kamon.metrics.Metrics.MetricGroupFilter -import kamon.metrics.Subscriptions.Subscribe -import java.util.concurrent.TimeUnit - -class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") - - /** Configured Dispatchers */ - val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions")) - val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings")) - - /** Configuration Settings */ - val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS) - - val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() - val filters = loadFilters(metricsExtConfig) - lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") - - def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { - if (shouldTrack(identity)) - Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig)).asInstanceOf[factory.GroupRecorder]) - else - None - } - - def unregister(identity: MetricGroupIdentity): Unit = { - storage.remove(identity) - } - - def subscribe[C <: MetricGroupCategory](category: C, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = { - subscriptions.tell(Subscribe(category, selection, permanently), receiver) - } - - def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = { - (for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap - } - - def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { - import scala.concurrent.duration._ - - system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) { - body - }(gaugeRecordingsDispatcher) - } - - private def shouldTrack(identity: MetricGroupIdentity): Boolean = { - filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(false) - } - - def loadFilters(config: Config): Map[String, MetricGroupFilter] = { - import scala.collection.JavaConverters._ - - val filters = config.getObjectList("filters").asScala - - val allFilters = - for ( - filter ← filters; - entry ← filter.entrySet().asScala - ) yield { - val key = entry.getKey - val keyBasedConfig = entry.getValue.atKey(key) - - val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList - val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList - - (key, MetricGroupFilter(includes, excludes)) - } - - allFilters.toMap - } -} - -object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Metrics - def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system) - - case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { - def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) - } -} diff --git a/kamon-core/src/main/scala/kamon/metrics/Scale.scala b/kamon-core/src/main/scala/kamon/metrics/Scale.scala deleted file mode 100644 index 6899490a..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/Scale.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -class Scale(val numericValue: Double) extends AnyVal - -object Scale { - val Nano = new Scale(1E-9) - val Micro = new Scale(1E-6) - val Milli = new Scale(1E-3) - val Unit = new Scale(1) - val Kilo = new Scale(1E3) - val Mega = new Scale(1E6) - val Giga = new Scale(1E9) - - def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue -} diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala deleted file mode 100644 index c9990229..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import akka.actor.{ Props, ActorRef, Actor } -import kamon.metrics.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } -import kamon.util.GlobPathFilter -import scala.concurrent.duration.{ FiniteDuration, Duration } -import java.util.concurrent.TimeUnit -import kamon.Kamon -import kamon.metrics.TickMetricSnapshotBuffer.{ Combined, FlushBuffer } - -class Subscriptions extends Actor { - import context.system - - val config = context.system.settings.config - val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS) - val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) - - var lastTick: Long = System.currentTimeMillis() - var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty - var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty - - def receive = { - case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent) - case FlushMetrics ⇒ flush() - } - - def subscribe(category: MetricGroupCategory, selection: String, permanent: Boolean): Unit = { - val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) - if (permanent) { - val receivers = subscribedPermanently.get(filter).getOrElse(Nil) - subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) - - } else { - val receivers = subscribedForOneShot.get(filter).getOrElse(Nil) - subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers) - } - - } - - def flush(): Unit = { - val currentTick = System.currentTimeMillis() - val snapshots = Kamon(Metrics).collect - - dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) - dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) - - lastTick = currentTick - subscribedForOneShot = Map.empty - } - - def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], - snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { - - for ((filter, receivers) ← subscriptions) yield { - val selection = snapshots.filter(group ⇒ filter.accept(group._1)) - val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) - - receivers.foreach(_ ! tickMetrics) - } - } -} - -object Subscriptions { - case object FlushMetrics - case class Subscribe(category: MetricGroupCategory, selection: String, permanently: Boolean = false) - case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) - - case class MetricGroupFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) { - def accept(identity: MetricGroupIdentity): Boolean = { - category.equals(identity.category) && globFilter.accept(identity.name) - } - } -} - -class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { - val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) - - def receive = empty - - def empty: Actor.Receive = { - case tick: TickMetricSnapshot ⇒ context become (buffering(tick)) - case FlushBuffer ⇒ // Nothing to flush. - } - - def buffering(buffered: TickMetricSnapshot): Actor.Receive = { - case TickMetricSnapshot(_, to, tickMetrics) ⇒ - val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) - val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) - - context become (buffering(combinedSnapshot)) - - case FlushBuffer ⇒ - receiver ! buffered - context become (empty) - - } - - override def postStop(): Unit = { - flushSchedule.cancel() - super.postStop() - } - - def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r))) -} - -object TickMetricSnapshotBuffer { - case object FlushBuffer - - case class Combined(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot - - def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = - Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) -} diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala deleted file mode 100644 index 5454edf5..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import org.HdrHistogram.HdrRecorder -import scala.collection.concurrent.TrieMap -import com.typesafe.config.Config - -case class TraceMetrics(name: String) extends MetricGroupIdentity { - val category = TraceMetrics -} - -object TraceMetrics extends MetricGroupCategory { - val name = "trace" - - case object ElapsedTime extends MetricIdentity { val name, tag = "elapsed-time" } - case class HttpClientRequest(name: String, tag: String) extends MetricIdentity - - class TraceMetricRecorder(val elapsedTime: HdrRecorder, private val segmentRecorderFactory: () ⇒ HdrRecorder) - extends MetricGroupRecorder { - - private val segments = TrieMap[MetricIdentity, HdrRecorder]() - - def segmentRecorder(segmentIdentity: MetricIdentity): HdrRecorder = - segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) - - def collect: MetricGroupSnapshot = TraceMetricSnapshot(elapsedTime.collect(), - segments.map { case (identity, recorder) ⇒ (identity, recorder.collect()) }.toMap) - } - - case class TraceMetricSnapshot(elapsedTime: MetricSnapshotLike, segments: Map[MetricIdentity, MetricSnapshotLike]) - extends MetricGroupSnapshot { - - def metrics: Map[MetricIdentity, MetricSnapshotLike] = segments + (ElapsedTime -> elapsedTime) - } - - val Factory = new MetricGroupFactory { - type GroupRecorder = TraceMetricRecorder - - def create(config: Config): TraceMetricRecorder = { - - val settings = config.getConfig("precision.trace") - val elapsedTimeConfig = extractPrecisionConfig(settings.getConfig("elapsed-time")) - val segmentConfig = extractPrecisionConfig(settings.getConfig("segment")) - - new TraceMetricRecorder( - HdrRecorder(elapsedTimeConfig.highestTrackableValue, elapsedTimeConfig.significantValueDigits, Scale.Nano), - () ⇒ HdrRecorder(segmentConfig.highestTrackableValue, segmentConfig.significantValueDigits, Scale.Nano)) - } - } - -} diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala deleted file mode 100644 index 3a39ec69..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics.instruments - -import org.HdrHistogram.HdrRecorder -import kamon.metrics.{ Scale, MetricSnapshotLike } - -/** - * This recorder keeps track of the last value recoded and automatically adds it after collecting a snapshot. This is - * useful in cases where the absence of recordings does not necessarily mean the absence of values. For example, if this - * recorder is used for recording the mailbox size of an actor, and it only gets updated upon message enqueue o dequeue, - * the absence of recordings during 1 second means that the size hasn't change (example: the actor being blocked doing - * some work) and it should keep its last known value, instead of dropping to zero and then going back to the real value - * after a new event is processed. - * - */ -class ContinuousHdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) - extends HdrRecorder(highestTrackableValue, significantValueDigits, scale) { - - @volatile private var lastRecordedValue: Long = 0 - - override def record(value: Long): Unit = { - lastRecordedValue = value - super.record(value) - } - - override def collect(): MetricSnapshotLike = { - val snapshot = super.collect() - super.record(lastRecordedValue) - - snapshot - } -} - -object ContinuousHdrRecorder { - def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) = - new ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale) -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala deleted file mode 100644 index e5efbc15..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala +++ /dev/null @@ -1,38 +0,0 @@ -package kamon.metrics.instruments -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -import kamon.metrics._ -import kamon.metrics.MetricSnapshot.Measurement - -import jsr166e.LongAdder - -class CounterRecorder extends MetricRecorder { - private val counter = new LongAdder - - def record(value: Long): Unit = { - counter.add(value) - } - - def collect(): MetricSnapshotLike = { - val sum = counter.sumThenReset() - MetricSnapshot(InstrumentTypes.Counter, sum, Scale.Unit, Vector(Measurement(1, sum))) - } -} - -object CounterRecorder { - def apply(): CounterRecorder = new CounterRecorder() -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala deleted file mode 100644 index ce4fd76d..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package org.HdrHistogram - -import java.util.concurrent.atomic.AtomicLongFieldUpdater -import scala.annotation.tailrec -import kamon.metrics._ - -/** - * This implementation aims to be used for real time data collection where data snapshots are taken often over time. - * The snapshotAndReset() operation extracts all the recorded values from the histogram and resets the counts, but still - * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. - */ -class HdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) - extends AtomicHistogram(1L, highestTrackableValue, significantValueDigits) with MetricRecorder { - - import HdrRecorder.totalCountUpdater - - def record(value: Long): Unit = recordValue(value) - - def collect(): MetricSnapshotLike = { - val entries = Vector.newBuilder[MetricSnapshot.Measurement] - val countsLength = counts.length() - - @tailrec def iterate(index: Int, previousValue: Long, nrOfRecordings: Long, bucketLimit: Long, increment: Long): Long = { - if (index < countsLength) { - val currentValue = previousValue + increment - val countAtValue = counts.getAndSet(index, 0) - - if (countAtValue > 0) - entries += MetricSnapshot.Measurement(currentValue, countAtValue) - - if (currentValue == bucketLimit) - iterate(index + 1, currentValue, nrOfRecordings + countAtValue, (bucketLimit << 1) + 1, increment << 1) - else - iterate(index + 1, currentValue, nrOfRecordings + countAtValue, bucketLimit, increment) - } else { - nrOfRecordings - } - } - - val nrOfRecordings = iterate(0, -1, 0, subBucketMask, 1) - - def tryUpdateTotalCount: Boolean = { - val previousTotalCount = getTotalCount - val newTotalCount = previousTotalCount - nrOfRecordings - - totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) - } - - while (!tryUpdateTotalCount) {} - - MetricSnapshot(InstrumentTypes.Histogram, nrOfRecordings, scale, entries.result()) - } - -} - -object HdrRecorder { - val totalCountUpdater = AtomicLongFieldUpdater.newUpdater(classOf[AtomicHistogram], "totalCount") - - def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale): HdrRecorder = - new HdrRecorder(highestTrackableValue, significantValueDigits, scale) - -} diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala deleted file mode 100644 index ba2550af..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala +++ /dev/null @@ -1,58 +0,0 @@ -package kamon.metrics.instruments - -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -import java.lang.Math._ -import jsr166e.LongMaxUpdater -import kamon.util.PaddedAtomicLong -import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement - -class MinMaxCounter { - private val min = new LongMaxUpdater - private val max = new LongMaxUpdater - private val sum = new PaddedAtomicLong - - min.update(0L) - max.update(0L) - - def increment(value: Long = 1L): Unit = { - val currentValue = sum.addAndGet(value) - max.update(currentValue) - } - - def decrement(value: Long = 1L): Unit = { - val currentValue = sum.addAndGet(-value) - min.update(-currentValue) - } - - def collect(): CounterMeasurement = { - val currentValue = { - val value = sum.get() - if (value < 0) 0 else value - } - val result = CounterMeasurement(abs(min.maxThenReset()), max.maxThenReset(), currentValue) - max.update(currentValue) - min.update(-currentValue) - result - } -} - -object MinMaxCounter { - def apply() = new MinMaxCounter() - - case class CounterMeasurement(min: Long, max: Long, current: Long) -} diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala deleted file mode 100644 index 640157a9..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/package.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon - -import scala.annotation.tailrec -import com.typesafe.config.Config - -package object metrics { - - case class HdrPrecisionConfig(highestTrackableValue: Long, significantValueDigits: Int) - - def extractPrecisionConfig(config: Config): HdrPrecisionConfig = - HdrPrecisionConfig(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) - - @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { - if (right.isEmpty) - left - else { - val (key, rightValue) = right.head - val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue) - - combineMaps(left.updated(key, value), right.tail)(valueMerger) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala b/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala new file mode 100644 index 00000000..258cc1b2 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala @@ -0,0 +1,11 @@ +package kamon.standalone + +import akka.actor.ActorSystem + +object KamonStandalone { + private lazy val system = ActorSystem("kamon-standalone") + + def registerHistogram(name: String) = { + + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 307cf17a..9ce3cd4e 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -18,11 +18,11 @@ package kamon.trace import akka.actor.ActorSystem import kamon.Kamon -import kamon.metrics._ +import kamon.metric._ import java.util.concurrent.ConcurrentLinkedQueue import kamon.trace.TraceContextAware.DefaultTraceContextAware import kamon.trace.TraceContext.SegmentIdentity -import kamon.metrics.TraceMetrics.TraceMetricRecorder +import kamon.metric.TraceMetrics.TraceMetricRecorder trait TraceContext { def name: String @@ -41,7 +41,7 @@ object TraceContext { } trait SegmentCompletionHandle { - def finish(metadata: Map[String, String]) + def finish(metadata: Map[String, String] = Map.empty) } case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String]) @@ -76,7 +76,7 @@ object SegmentCompletionHandleAware { } class SimpleMetricCollectionContext(@volatile private var _name: String, val token: String, metadata: Map[String, String], - val system: ActorSystem) extends TraceContext { + val system: ActorSystem) extends TraceContext { @volatile private var _isOpen = true val levelOfDetail = OnlyMetrics val startMark = System.nanoTime() diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index 0e264cd2..efe08cdb 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -33,7 +33,7 @@ object TraceRecorder { def newToken = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet()) private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String], - system: ActorSystem): TraceContext = { + system: ActorSystem): TraceContext = { // In the future this should select between implementations. val finalToken = token.getOrElse(newToken) @@ -51,7 +51,7 @@ object TraceRecorder { traceContextStorage.set(Some(ctx)) } - def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): Option[SegmentCompletionHandle] = + def startSegment(identity: SegmentIdentity, metadata: Map[String, String] = Map.empty): Option[SegmentCompletionHandle] = currentContext.map(_.startSegment(identity, metadata)) def rename(name: String): Unit = currentContext.map(_.rename(name)) diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala new file mode 100644 index 00000000..0f682500 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala @@ -0,0 +1,87 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.instrumentation.akka + +import akka.actor.{ Actor, ActorSystem, Props } +import akka.pattern.{ ask, pipe } +import akka.routing.RoundRobinPool +import akka.testkit.{ ImplicitSender, TestKit } +import akka.util.Timeout +import kamon.trace.TraceRecorder +import org.scalatest.WordSpecLike + +import scala.concurrent.duration._ + +class ActorCellInstrumentationSpec extends TestKit(ActorSystem("actor-cell-instrumentation-spec")) with WordSpecLike + with ImplicitSender { + + implicit val executionContext = system.dispatcher + + "the message passing instrumentation" should { + "propagate the TraceContext using bang" in new EchoActorFixture { + val testTraceContext = TraceRecorder.withNewTraceContext("bang-reply") { + ctxEchoActor ! "test" + TraceRecorder.currentContext + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext using tell" in new EchoActorFixture { + val testTraceContext = TraceRecorder.withNewTraceContext("tell-reply") { + ctxEchoActor.tell("test", testActor) + TraceRecorder.currentContext + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext using ask" in new EchoActorFixture { + implicit val timeout = Timeout(1 seconds) + val testTraceContext = TraceRecorder.withNewTraceContext("ask-reply") { + // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. + (ctxEchoActor ? "test") pipeTo (testActor) + TraceRecorder.currentContext + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext to actors behind a router" in new RoutedEchoActorFixture { + val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { + ctxEchoActor ! "test" + TraceRecorder.currentContext + } + + expectMsg(testTraceContext) + } + } + + trait EchoActorFixture { + val ctxEchoActor = system.actorOf(Props[TraceContextEcho]) + } + + trait RoutedEchoActorFixture extends EchoActorFixture { + override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinPool(nrOfInstances = 1))) + } +} + +class TraceContextEcho extends Actor { + def receive = { + case msg: String ⇒ sender ! TraceRecorder.currentContext + } +} + diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala new file mode 100644 index 00000000..3dab44bc --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala @@ -0,0 +1,52 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.instrumentation.akka + +import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } +import akka.event.Logging.LogEvent +import akka.testkit.TestKit +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.scalatest.{ Inspectors, Matchers, WordSpecLike } + +class ActorLoggingInstrumentationSpec extends TestKit(ActorSystem("actor-logging-instrumentation-spec")) with WordSpecLike + with Matchers with Inspectors { + + "the ActorLogging instrumentation" should { + "attach the TraceContext (if available) to log events" in { + val loggerActor = system.actorOf(Props[LoggerActor]) + system.eventStream.subscribe(testActor, classOf[LogEvent]) + + val testTraceContext = TraceRecorder.withNewTraceContext("logging") { + loggerActor ! "info" + TraceRecorder.currentContext + } + + fishForMessage() { + case event: LogEvent if event.message.toString contains "TraceContext =>" ⇒ + val ctxInEvent = event.asInstanceOf[TraceContextAware].traceContext + ctxInEvent === testTraceContext + + case event: LogEvent ⇒ false + } + } + } +} + +class LoggerActor extends Actor with ActorLogging { + def receive = { + case "info" ⇒ log.info("TraceContext => {}", TraceRecorder.currentContext) + } +} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala new file mode 100644 index 00000000..47867c55 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala @@ -0,0 +1,172 @@ +package kamon.instrumentation.akka + +import akka.actor.SupervisorStrategy.{ Escalate, Restart, Resume, Stop } +import akka.actor._ +import akka.testkit.{ ImplicitSender, TestKit } +import kamon.trace.TraceRecorder +import org.scalatest.WordSpecLike + +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +class ActorSystemMessageInstrumentationSpec extends TestKit(ActorSystem("actor-system-message-instrumentation-spec")) + with WordSpecLike with ImplicitSender { + + implicit val executionContext = system.dispatcher + + "the system message passing instrumentation" should { + "keep the TraceContext while processing the Create message in top level actors" in { + val testTraceContext = TraceRecorder.withNewTraceContext("creating-top-level-actor") { + system.actorOf(Props(new Actor { + testActor ! TraceRecorder.currentContext + def receive: Actor.Receive = { case any ⇒ } + })) + + TraceRecorder.currentContext + } + + expectMsg(testTraceContext) + } + + "keep the TraceContext while processing the Create message in non top level actors" in { + val testTraceContext = TraceRecorder.withNewTraceContext("creating-non-top-level-actor") { + system.actorOf(Props(new Actor { + def receive: Actor.Receive = { + case any ⇒ + context.actorOf(Props(new Actor { + testActor ! TraceRecorder.currentContext + def receive: Actor.Receive = { case any ⇒ } + })) + } + })) ! "any" + + TraceRecorder.currentContext + } + + expectMsg(testTraceContext) + } + + "keep the TraceContext in the supervision cycle" when { + "the actor is resumed" in { + val supervisor = supervisorWithDirective(Resume) + + val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-resume") { + supervisor ! "fail" + TraceRecorder.currentContext + } + + expectMsg(testTraceContext) // From the parent executing the supervision strategy + + // Ensure we didn't tie the actor with the context + supervisor ! "context" + expectMsg(None) + } + + "the actor is restarted" in { + val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true) + + val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-restart") { + supervisor ! "fail" + TraceRecorder.currentContext + } + + expectMsg(testTraceContext) // From the parent executing the supervision strategy + expectMsg(testTraceContext) // From the preRestart hook + expectMsg(testTraceContext) // From the postRestart hook + + // Ensure we didn't tie the actor with the context + supervisor ! "context" + expectMsg(None) + } + + "the actor is stopped" in { + val supervisor = supervisorWithDirective(Stop, sendPostStop = true) + + val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-stop") { + supervisor ! "fail" + TraceRecorder.currentContext + } + + expectMsg(testTraceContext) // From the parent executing the supervision strategy + expectMsg(testTraceContext) // From the postStop hook + expectNoMsg(1 second) + } + + "the failure is escalated" in { + val supervisor = supervisorWithDirective(Escalate, sendPostStop = true) + + val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-escalate") { + supervisor ! "fail" + TraceRecorder.currentContext + } + + expectMsg(testTraceContext) // From the parent executing the supervision strategy + expectMsg(testTraceContext) // From the grandparent executing the supervision strategy + expectMsg(testTraceContext) // From the postStop hook in the child + expectMsg(testTraceContext) // From the postStop hook in the parent + expectNoMsg(1 second) + } + } + } + + def supervisorWithDirective(directive: SupervisorStrategy.Directive, sendPreRestart: Boolean = false, sendPostRestart: Boolean = false, + sendPostStop: Boolean = false, sendPreStart: Boolean = false): ActorRef = { + class GrandParent extends Actor { + val child = context.actorOf(Props(new Parent)) + + override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; Stop + } + + def receive = { + case any ⇒ child forward any + } + } + + class Parent extends Actor { + val child = context.actorOf(Props(new Child)) + + override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; directive + } + + def receive: Actor.Receive = { + case any ⇒ child forward any + } + + override def postStop(): Unit = { + if (sendPostStop) testActor ! TraceRecorder.currentContext + super.postStop() + } + } + + class Child extends Actor { + def receive = { + case "fail" ⇒ 1 / 0 + case "context" ⇒ sender ! TraceRecorder.currentContext + } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + if (sendPreRestart) testActor ! TraceRecorder.currentContext + super.preRestart(reason, message) + } + + override def postRestart(reason: Throwable): Unit = { + if (sendPostRestart) testActor ! TraceRecorder.currentContext + super.postRestart(reason) + } + + override def postStop(): Unit = { + if (sendPostStop) testActor ! TraceRecorder.currentContext + super.postStop() + } + + override def preStart(): Unit = { + if (sendPreStart) testActor ! TraceRecorder.currentContext + super.preStart() + } + } + + system.actorOf(Props(new GrandParent)) + } +} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala new file mode 100644 index 00000000..d914ffe8 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala @@ -0,0 +1,67 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.instrumentation.akka + +import akka.actor.{ Actor, ActorSystem, Props } +import akka.event.Logging.Warning +import akka.pattern.ask +import akka.testkit.TestKitBase +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.scalatest.{ Matchers, WordSpecLike } + +import scala.concurrent.duration._ + +class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers { + implicit lazy val system: ActorSystem = ActorSystem("ask-pattern-tracing-spec", ConfigFactory.parseString( + """ + |kamon { + | trace { + | ask-pattern-tracing = on + | } + |} + """.stripMargin)) + + "the AskPatternTracing" should { + "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in { + implicit val ec = system.dispatcher + implicit val timeout = Timeout(10 milliseconds) + val noReply = system.actorOf(Props[NoReply]) + system.eventStream.subscribe(testActor, classOf[Warning]) + + val testTraceContext = TraceRecorder.withNewTraceContext("ask-timeout-warning") { + noReply ? "hello" + TraceRecorder.currentContext + } + + val warn = expectMsgPF() { + case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn + } + val capturedCtx = warn.asInstanceOf[TraceContextAware].traceContext + + capturedCtx should be('defined) + capturedCtx should equal(testTraceContext) + } + } +} + +class NoReply extends Actor { + def receive = { + case any ⇒ + } +} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala new file mode 100644 index 00000000..31afd3ff --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala @@ -0,0 +1,63 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.instrumentation.scala + +import akka.actor.ActorSystem +import akka.testkit.TestKit +import kamon.trace.TraceRecorder +import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } +import org.scalatest.{ Matchers, OptionValues, WordSpecLike } + +import scala.concurrent.Future + +class FutureInstrumentationSpec extends TestKit(ActorSystem("future-instrumentation-spec")) with WordSpecLike with Matchers + with ScalaFutures with PatienceConfiguration with OptionValues { + + implicit val execContext = system.dispatcher + + "a Future created with FutureTracing" should { + "capture the TraceContext available when created" which { + "must be available when executing the future's body" in { + + val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { + val future = Future(TraceRecorder.currentContext) + + (future, TraceRecorder.currentContext) + } + + whenReady(future)(ctxInFuture ⇒ + ctxInFuture should equal(testTraceContext)) + } + + "must be available when executing callbacks on the future" in { + + val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { + val future = Future("Hello Kamon!") + // The TraceContext is expected to be available during all intermediate processing. + .map(_.length) + .flatMap(len ⇒ Future(len.toString)) + .map(s ⇒ TraceRecorder.currentContext) + + (future, TraceRecorder.currentContext) + } + + whenReady(future)(ctxInFuture ⇒ + ctxInFuture should equal(testTraceContext)) + } + } + } +} + diff --git a/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala new file mode 100644 index 00000000..481f03c5 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala @@ -0,0 +1,202 @@ +/* ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import java.nio.LongBuffer + +import akka.instrumentation.ActorCellMetrics +import kamon.metric.ActorMetricsTestActor._ +import kamon.metric.instrument.Histogram.MutableRecord +import org.scalatest.{ WordSpecLike, Matchers } +import akka.testkit.{ ImplicitSender, TestProbe, TestKitBase } +import akka.actor._ +import com.typesafe.config.ConfigFactory +import scala.concurrent.duration._ +import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.ActorMetrics.{ ActorMetricsRecorder, ActorMetricSnapshot } + +class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | filters = [ + | { + | actor { + | includes = [ "user/tracked-*", "user/measuring-*", "user/clean-after-collect" ] + | excludes = [ "user/tracked-explicitly-excluded"] + | } + | } + | ] + | precision { + | default-histogram-precision { + | highest-trackable-value = 3600000000000 + | significant-value-digits = 2 + | } + | + | default-min-max-counter-precision { + | refresh-interval = 1 second + | highest-trackable-value = 999999999 + | significant-value-digits = 2 + | } + | } + |} + """.stripMargin)) + + "the Kamon actor metrics" should { + "respect the configured include and exclude filters" in new ActorMetricsFixtures { + val trackedActor = createTestActor("tracked-actor") + actorMetricsRecorderOf(trackedActor) should not be empty + + val nonTrackedActor = createTestActor("non-tracked-actor") + actorMetricsRecorderOf(nonTrackedActor) shouldBe empty + + val trackedButExplicitlyExcluded = createTestActor("tracked-explicitly-excluded") + actorMetricsRecorderOf(trackedButExplicitlyExcluded) shouldBe empty + } + + "reset all recording instruments after taking a snapshot" in new ActorMetricsFixtures { + val trackedActor = createTestActor("clean-after-collect") + val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get + for (i ← 1 to 100) { + trackedActor ! Discard + } + trackedActor ! Fail + trackedActor ! TrackTimings(sleep = Some(1 second)) + expectMsgType[TrackedTimings] + + val firstSnapshot = takeSnapshotOf(trackedActorMetrics) + firstSnapshot.errors.count should be(1L) + firstSnapshot.mailboxSize.numberOfMeasurements should be > 0L + firstSnapshot.processingTime.numberOfMeasurements should be(103L) // 102 examples + Initialize message + firstSnapshot.timeInMailbox.numberOfMeasurements should be(103L) // 102 examples + Initialize message + + val secondSnapshot = takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean + secondSnapshot.errors.count should be(0L) + secondSnapshot.mailboxSize.numberOfMeasurements should be <= 3L + secondSnapshot.processingTime.numberOfMeasurements should be(0L) // 102 examples + Initialize message + secondSnapshot.timeInMailbox.numberOfMeasurements should be(0L) // 102 examples + Initialize message + } + + "record the processing-time of the receive function" in new ActorMetricsFixtures { + val trackedActor = createTestActor("measuring-processing-time") + val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get + takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean + + trackedActor ! TrackTimings(sleep = Some(1 second)) + val timings = expectMsgType[TrackedTimings] + val snapshot = takeSnapshotOf(trackedActorMetrics) + + snapshot.processingTime.numberOfMeasurements should be(1L) + snapshot.processingTime.recordsIterator.next().count should be(1L) + snapshot.processingTime.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + } + + "record the number of errors" in new ActorMetricsFixtures { + val trackedActor = createTestActor("measuring-errors") + val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get + takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean + + for (i ← 1 to 10) { trackedActor ! Fail } + trackedActor ! Ping + expectMsg(Pong) + val snapshot = takeSnapshotOf(trackedActorMetrics) + + snapshot.errors.count should be(10) + } + + "record the mailbox-size" in new ActorMetricsFixtures { + val trackedActor = createTestActor("measuring-mailbox-size") + val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get + takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean + + trackedActor ! TrackTimings(sleep = Some(1 second)) + for (i ← 1 to 10) { + trackedActor ! Discard + } + trackedActor ! Ping + + val timings = expectMsgType[TrackedTimings] + expectMsg(Pong) + val snapshot = takeSnapshotOf(trackedActorMetrics) + + snapshot.mailboxSize.min should be(0L) + snapshot.mailboxSize.max should be(11L +- 1L) + } + + "record the time-in-mailbox" in new ActorMetricsFixtures { + val trackedActor = createTestActor("measuring-time-in-mailbox") + val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get + takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean + + trackedActor ! TrackTimings(sleep = Some(1 second)) + val timings = expectMsgType[TrackedTimings] + val snapshot = takeSnapshotOf(trackedActorMetrics) + + snapshot.timeInMailbox.numberOfMeasurements should be(1L) + snapshot.timeInMailbox.recordsIterator.next().count should be(1L) + snapshot.timeInMailbox.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + } + } + + trait ActorMetricsFixtures { + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) + } + + def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetricsRecorder] = { + val initialisationListener = TestProbe() + ref.tell(Ping, initialisationListener.ref) + initialisationListener.expectMsg(Pong) + + val underlyingCellField = ref.getClass.getDeclaredMethod("underlying") + val cell = underlyingCellField.invoke(ref).asInstanceOf[ActorCellMetrics] + + cell.actorMetricsRecorder + } + + def createTestActor(name: String): ActorRef = system.actorOf(Props[ActorMetricsTestActor], name) + + def takeSnapshotOf(amr: ActorMetricsRecorder): ActorMetricSnapshot = amr.collect(collectionContext) + } +} + +class ActorMetricsTestActor extends Actor { + def receive = { + case Discard ⇒ + case Fail ⇒ 1 / 0 + case Ping ⇒ sender ! Pong + case TrackTimings(sendTimestamp, sleep) ⇒ { + val dequeueTimestamp = System.nanoTime() + sleep.map(s ⇒ Thread.sleep(s.toMillis)) + val afterReceiveTimestamp = System.nanoTime() + + sender ! TrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) + } + } +} + +object ActorMetricsTestActor { + case object Ping + case object Pong + case object Fail + case object Discard + + case class TrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) + case class TrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { + def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp + def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp + } +} diff --git a/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala new file mode 100644 index 00000000..7434c4ee --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala @@ -0,0 +1,105 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import org.scalatest.{ WordSpecLike, Matchers } +import akka.testkit.{ TestProbe, TestKitBase } +import akka.actor.{ ActorRef, Props, ActorSystem } +import com.typesafe.config.ConfigFactory +import scala.concurrent.duration._ +import kamon.Kamon +import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.DispatcherMetrics.DispatcherMetricSnapshot + +class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers { + implicit lazy val system: ActorSystem = ActorSystem("dispatcher-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | filters = [ + | { + | dispatcher { + | includes = ["*"] + | excludes = ["dispatcher-explicitly-excluded"] + | } + | } + | ] + |} + | + |dispatcher-explicitly-excluded { + | type = "Dispatcher" + | executor = "fork-join-executor" + |} + | + |tracked-dispatcher { + | type = "Dispatcher" + | executor = "thread-pool-executor" + |} + | + """.stripMargin)) + + "the Kamon dispatcher metrics" should { + "respect the configured include and exclude filters" in { + system.actorOf(Props[ActorMetricsTestActor].withDispatcher("tracked-dispatcher"), "actor-with-tracked-dispatcher") + system.actorOf(Props[ActorMetricsTestActor].withDispatcher("dispatcher-explicitly-excluded"), "actor-with-excluded-dispatcher") + + Kamon(Metrics).subscribe(DispatcherMetrics, "*", testActor, permanently = true) + expectMsgType[TickMetricSnapshot] + + within(2 seconds) { + val tickSnapshot = expectMsgType[TickMetricSnapshot] + tickSnapshot.metrics.keys should contain(DispatcherMetrics("tracked-dispatcher")) + tickSnapshot.metrics.keys should not contain (DispatcherMetrics("dispatcher-explicitly-excluded")) + } + } + + "record maximumPoolSize, runningThreadCount, queueTaskCount, poolSize metrics" in new DelayableActorFixture { + val (delayable, metricsListener) = delayableActor("worker-actor", "tracked-dispatcher") + + for (_ ← 1 to 100) { + //delayable ! Discard + } + + val dispatcherMetrics = expectDispatcherMetrics("tracked-dispatcher", metricsListener, 3 seconds) + dispatcherMetrics.maximumPoolSize.max should be <= 64L //fail in travis + dispatcherMetrics.poolSize.max should be <= 22L //fail in travis + dispatcherMetrics.queueTaskCount.max should be(0L) + dispatcherMetrics.runningThreadCount.max should be(0L) + } + + } + + def expectDispatcherMetrics(dispatcherId: String, listener: TestProbe, waitTime: FiniteDuration): DispatcherMetricSnapshot = { + val tickSnapshot = within(waitTime) { + listener.expectMsgType[TickMetricSnapshot] + } + val dispatcherMetricsOption = tickSnapshot.metrics.get(DispatcherMetrics(dispatcherId)) + dispatcherMetricsOption should not be empty + dispatcherMetricsOption.get.asInstanceOf[DispatcherMetricSnapshot] + } + + trait DelayableActorFixture { + def delayableActor(name: String, dispatcher: String): (ActorRef, TestProbe) = { + val actor = system.actorOf(Props[ActorMetricsTestActor].withDispatcher(dispatcher), name) + val metricsListener = TestProbe() + + Kamon(Metrics).subscribe(DispatcherMetrics, "*", metricsListener.ref, permanently = true) + // Wait for one empty snapshot before proceeding to the test. + metricsListener.expectMsgType[TickMetricSnapshot] + + (actor, metricsListener) + } + } +} diff --git a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala new file mode 100644 index 00000000..ee851672 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala @@ -0,0 +1,109 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.metric.instrument.Histogram +import kamon.metric.instrument.Histogram.MutableRecord +import org.scalatest.{ Matchers, WordSpecLike } +import akka.testkit.{ ImplicitSender, TestKitBase } +import akka.actor.ActorSystem +import scala.concurrent.duration._ +import kamon.metric.Subscriptions.TickMetricSnapshot + +class TickMetricSnapshotBufferSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | tick-interval = 1 hour + | filters = [ + | { + | trace { + | includes = [ "*" ] + | excludes = [ "non-tracked-trace"] + | } + | } + | ] + |} + """.stripMargin)) + + "the TickMetricSnapshotBuffer" should { + "merge TickMetricSnapshots received until the flush timeout is reached and fix the from/to fields" in new SnapshotFixtures { + val buffer = system.actorOf(TickMetricSnapshotBuffer.props(3 seconds, testActor)) + + buffer ! firstEmpty + buffer ! secondEmpty + buffer ! thirdEmpty + + within(2 seconds)(expectNoMsg()) + val mergedSnapshot = expectMsgType[TickMetricSnapshot] + + mergedSnapshot.from should equal(1000) + mergedSnapshot.to should equal(4000) + mergedSnapshot.metrics should be('empty) + } + + "merge empty and non-empty snapshots" in new SnapshotFixtures { + val buffer = system.actorOf(TickMetricSnapshotBuffer.props(3 seconds, testActor)) + + buffer ! firstNonEmpty + buffer ! secondNonEmpty + buffer ! thirdEmpty + + within(2 seconds)(expectNoMsg()) + val mergedSnapshot = expectMsgType[TickMetricSnapshot] + + mergedSnapshot.from should equal(1000) + mergedSnapshot.to should equal(4000) + mergedSnapshot.metrics should not be ('empty) + + val testMetricSnapshot = mergedSnapshot.metrics(testTraceIdentity).metrics(TraceMetrics.ElapsedTime).asInstanceOf[Histogram.Snapshot] + testMetricSnapshot.min should equal(10) + testMetricSnapshot.max should equal(300) + testMetricSnapshot.numberOfMeasurements should equal(6) + testMetricSnapshot.recordsIterator.toStream should contain allOf ( + MutableRecord(10, 3), + MutableRecord(20, 1), + MutableRecord(30, 1), + MutableRecord(300, 1)) + + } + } + + trait SnapshotFixtures { + val collectionContext = CollectionContext.default + val testTraceIdentity = TraceMetrics("buffer-spec-test-trace") + val traceRecorder = Kamon(Metrics).register(testTraceIdentity, TraceMetrics.Factory).get + + val firstEmpty = TickMetricSnapshot(1000, 2000, Map.empty) + val secondEmpty = TickMetricSnapshot(2000, 3000, Map.empty) + val thirdEmpty = TickMetricSnapshot(3000, 4000, Map.empty) + + traceRecorder.elapsedTime.record(10L) + traceRecorder.elapsedTime.record(20L) + traceRecorder.elapsedTime.record(30L) + val firstNonEmpty = TickMetricSnapshot(1000, 2000, Map( + (testTraceIdentity -> traceRecorder.collect(collectionContext)))) + + traceRecorder.elapsedTime.record(10L) + traceRecorder.elapsedTime.record(10L) + traceRecorder.elapsedTime.record(300L) + val secondNonEmpty = TickMetricSnapshot(1000, 2000, Map( + (testTraceIdentity -> traceRecorder.collect(collectionContext)))) + } +} diff --git a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala new file mode 100644 index 00000000..dab9b52a --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala @@ -0,0 +1,92 @@ +package kamon.metric + +import akka.actor.ActorSystem +import akka.testkit.{ ImplicitSender, TestKitBase } +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.metric.TraceMetrics.TraceMetricsSnapshot +import kamon.trace.TraceContext.SegmentIdentity +import kamon.trace.TraceRecorder +import org.scalatest.{ Matchers, WordSpecLike } + +class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | tick-interval = 1 hour + | filters = [ + | { + | trace { + | includes = [ "*" ] + | excludes = [ "non-tracked-trace"] + | } + | } + | ] + | precision { + | default-histogram-precision { + | highest-trackable-value = 3600000000000 + | significant-value-digits = 2 + | } + | + | default-min-max-counter-precision { + | refresh-interval = 1 second + | highest-trackable-value = 999999999 + | significant-value-digits = 2 + | } + | } + |} + """.stripMargin)) + + "the TraceMetrics" should { + "record the elapsed time between a trace creation and finish" in { + for (repetitions ← 1 to 10) { + TraceRecorder.withNewTraceContext("record-elapsed-time") { + TraceRecorder.finish() + } + } + + val snapshot = takeSnapshotOf("record-elapsed-time") + snapshot.elapsedTime.numberOfMeasurements should be(10) + snapshot.segments shouldBe empty + } + + "record the elapsed time for segments that occur inside a given trace" in { + TraceRecorder.withNewTraceContext("trace-with-segments") { + val segmentHandle = TraceRecorder.startSegment(TraceMetricsTestSegment("test-segment")) + segmentHandle.get.finish() + TraceRecorder.finish() + } + + val snapshot = takeSnapshotOf("trace-with-segments") + snapshot.elapsedTime.numberOfMeasurements should be(1) + snapshot.segments.size should be(1) + snapshot.segments(TraceMetricsTestSegment("test-segment")).numberOfMeasurements should be(1) + } + + "record the elapsed time for segments that finish after their correspondent trace has finished" in { + val segmentHandle = TraceRecorder.withNewTraceContext("closing-segment-after-trace") { + val sh = TraceRecorder.startSegment(TraceMetricsTestSegment("test-segment")) + TraceRecorder.finish() + sh + } + + val beforeFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace") + beforeFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(1) + beforeFinishSegmentSnapshot.segments.size should be(0) + + segmentHandle.get.finish() + + val afterFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace") + afterFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(0) + afterFinishSegmentSnapshot.segments.size should be(1) + afterFinishSegmentSnapshot.segments(TraceMetricsTestSegment("test-segment")).numberOfMeasurements should be(1) + } + } + + case class TraceMetricsTestSegment(name: String) extends SegmentIdentity + + def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = { + val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory) + recorder.get.collect(CollectionContext.default) + } +} diff --git a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala new file mode 100644 index 00000000..57bc3d0d --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala @@ -0,0 +1,278 @@ +package kamon.metric + +import akka.actor.ActorSystem +import akka.testkit.{ ImplicitSender, TestKitBase } +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.metric.UserMetrics.{ UserGauge, UserMinMaxCounter, UserCounter, UserHistogram } +import kamon.metric.instrument.Histogram +import kamon.metric.instrument.Histogram.MutableRecord +import org.scalatest.{ Matchers, WordSpecLike } +import scala.concurrent.duration._ + +class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | flush-interval = 1 hour + | precision { + | default-histogram-precision { + | highest-trackable-value = 10000 + | significant-value-digits = 2 + | } + | + | default-min-max-counter-precision { + | refresh-interval = 1 hour + | highest-trackable-value = 1000 + | significant-value-digits = 2 + | } + | + | default-gauge-precision { + | refresh-interval = 1 hour + | highest-trackable-value = 999999999 + | significant-value-digits = 2 + | } + | } + |} + """.stripMargin)) + + "the UserMetrics extension" should { + "allow registering a fully configured Histogram and get the same Histogram if registering again" in { + val histogramA = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L) + val histogramB = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L) + + histogramA shouldBe theSameInstanceAs(histogramB) + } + + "return the original Histogram when registering a fully configured Histogram for second time but with different settings" in { + val histogramA = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L) + val histogramB = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Fine, 50000L) + + histogramA shouldBe theSameInstanceAs(histogramB) + } + + "allow registering a Histogram that takes the default configuration from the kamon.metrics.precision settings" in { + Kamon(UserMetrics).registerHistogram("histogram-with-default-configuration") + } + + "allow registering a Counter and get the same Counter if registering again" in { + val counterA = Kamon(UserMetrics).registerCounter("counter") + val counterB = Kamon(UserMetrics).registerCounter("counter") + + counterA shouldBe theSameInstanceAs(counterB) + } + + "allow registering a fully configured MinMaxCounter and get the same MinMaxCounter if registering again" in { + val minMaxCounterA = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second) + val minMaxCounterB = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second) + + minMaxCounterA shouldBe theSameInstanceAs(minMaxCounterB) + } + + "return the original MinMaxCounter when registering a fully configured MinMaxCounter for second time but with different settings" in { + val minMaxCounterA = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second) + val minMaxCounterB = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Fine, 5000L, 1 second) + + minMaxCounterA shouldBe theSameInstanceAs(minMaxCounterB) + } + + "allow registering a MinMaxCounter that takes the default configuration from the kamon.metrics.precision settings" in { + Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-default-configuration") + } + + "allow registering a fully configured Gauge and get the same Gauge if registering again" in { + val gaugeA = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Normal, 1000L, 1 second) { + () ⇒ 1L + } + + val gaugeB = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Normal, 1000L, 1 second) { + () ⇒ 1L + } + + gaugeA shouldBe theSameInstanceAs(gaugeB) + } + + "return the original Gauge when registering a fully configured Gauge for second time but with different settings" in { + val gaugeA = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Normal, 1000L, 1 second) { + () ⇒ 1L + } + + val gaugeB = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Fine, 5000L, 1 second) { + () ⇒ 1L + } + + gaugeA shouldBe theSameInstanceAs(gaugeB) + } + + "allow registering a Gauge that takes the default configuration from the kamon.metrics.precision settings" in { + Kamon(UserMetrics).registerGauge("gauge-with-default-configuration") { + () ⇒ 2L + } + } + + "generate a snapshot containing all the registered user metrics and reset all instruments" in { + val context = CollectionContext.default + val userMetricsRecorder = Kamon(Metrics).register(UserMetrics, UserMetrics.Factory).get + + val histogramWithSettings = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L) + val histogramWithDefaultConfiguration = Kamon(UserMetrics).registerHistogram("histogram-with-default-configuration") + val counter = Kamon(UserMetrics).registerCounter("counter") + val minMaxCounterWithSettings = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second) + val gauge = Kamon(UserMetrics).registerGauge("gauge-with-default-configuration") { () ⇒ 2L } + + // lets put some values on those metrics + histogramWithSettings.record(10) + histogramWithSettings.record(20, 100) + histogramWithDefaultConfiguration.record(40) + + counter.increment() + counter.increment(16) + + minMaxCounterWithSettings.increment(43) + minMaxCounterWithSettings.decrement() + + gauge.record(15) + + val firstSnapshot = userMetricsRecorder.collect(context) + + firstSnapshot.histograms.size should be(2) + firstSnapshot.histograms.keys should contain allOf ( + UserHistogram("histogram-with-settings"), + UserHistogram("histogram-with-default-configuration")) + + firstSnapshot.histograms(UserHistogram("histogram-with-settings")).min shouldBe (10) + firstSnapshot.histograms(UserHistogram("histogram-with-settings")).max shouldBe (20) + firstSnapshot.histograms(UserHistogram("histogram-with-settings")).numberOfMeasurements should be(101) + firstSnapshot.histograms(UserHistogram("histogram-with-settings")).recordsIterator.toStream should contain allOf ( + MutableRecord(10, 1), + MutableRecord(20, 100)) + + firstSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).min shouldBe (40) + firstSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).max shouldBe (40) + firstSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).numberOfMeasurements should be(1) + firstSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).recordsIterator.toStream should contain only ( + MutableRecord(40, 1)) + + firstSnapshot.counters(UserCounter("counter")).count should be(17) + + firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).min shouldBe (0) + firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).max shouldBe (43) + firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).numberOfMeasurements should be(3) + firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).recordsIterator.toStream should contain allOf ( + MutableRecord(0, 1), // min + MutableRecord(42, 1), // current + MutableRecord(43, 1)) // max + + firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).min shouldBe (0) + firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).max shouldBe (0) + firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).numberOfMeasurements should be(3) + firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).recordsIterator.toStream should contain only ( + MutableRecord(0, 3)) // min, max and current + + firstSnapshot.gauges(UserGauge("gauge-with-default-configuration")).min shouldBe (15) + firstSnapshot.gauges(UserGauge("gauge-with-default-configuration")).max shouldBe (15) + firstSnapshot.gauges(UserGauge("gauge-with-default-configuration")).numberOfMeasurements should be(1) + firstSnapshot.gauges(UserGauge("gauge-with-default-configuration")).recordsIterator.toStream should contain only ( + MutableRecord(15, 1)) // only the manually recorded value + + val secondSnapshot = userMetricsRecorder.collect(context) + + secondSnapshot.histograms.size should be(2) + secondSnapshot.histograms.keys should contain allOf ( + UserHistogram("histogram-with-settings"), + UserHistogram("histogram-with-default-configuration")) + + secondSnapshot.histograms(UserHistogram("histogram-with-settings")).min shouldBe (0) + secondSnapshot.histograms(UserHistogram("histogram-with-settings")).max shouldBe (0) + secondSnapshot.histograms(UserHistogram("histogram-with-settings")).numberOfMeasurements should be(0) + secondSnapshot.histograms(UserHistogram("histogram-with-settings")).recordsIterator.toStream shouldBe empty + + secondSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).min shouldBe (0) + secondSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).max shouldBe (0) + secondSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).numberOfMeasurements should be(0) + secondSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).recordsIterator.toStream shouldBe empty + + secondSnapshot.counters(UserCounter("counter")).count should be(0) + + secondSnapshot.minMaxCounters.size should be(2) + secondSnapshot.minMaxCounters.keys should contain allOf ( + UserMinMaxCounter("min-max-counter-with-settings"), + UserMinMaxCounter("min-max-counter-with-default-configuration")) + + secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).min shouldBe (42) + secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).max shouldBe (42) + secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).numberOfMeasurements should be(3) + secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).recordsIterator.toStream should contain only ( + MutableRecord(42, 3)) // min, max and current + + secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).min shouldBe (0) + secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).max shouldBe (0) + secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).numberOfMeasurements should be(3) + secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).recordsIterator.toStream should contain only ( + MutableRecord(0, 3)) // min, max and current + + secondSnapshot.gauges(UserGauge("gauge-with-default-configuration")).min shouldBe (0) + secondSnapshot.gauges(UserGauge("gauge-with-default-configuration")).max shouldBe (0) + secondSnapshot.gauges(UserGauge("gauge-with-default-configuration")).numberOfMeasurements should be(0) + secondSnapshot.gauges(UserGauge("gauge-with-default-configuration")).recordsIterator shouldBe empty + + } + + "generate a snapshot that can be merged with another" in { + val context = CollectionContext.default + val userMetricsRecorder = Kamon(Metrics).register(UserMetrics, UserMetrics.Factory).get + + val histogram = Kamon(UserMetrics).registerHistogram("histogram-for-merge") + val counter = Kamon(UserMetrics).registerCounter("counter-for-merge") + val minMaxCounter = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-for-merge") + val gauge = Kamon(UserMetrics).registerGauge("gauge-for-merge") { () ⇒ 10L } + + histogram.record(100) + counter.increment(10) + minMaxCounter.increment(50) + minMaxCounter.decrement(10) + gauge.record(50) + + val firstSnapshot = userMetricsRecorder.collect(context) + + val extraCounter = Kamon(UserMetrics).registerCounter("extra-counter") + histogram.record(200) + extraCounter.increment(20) + minMaxCounter.increment(40) + minMaxCounter.decrement(50) + gauge.record(70) + + val secondSnapshot = userMetricsRecorder.collect(context) + val mergedSnapshot = firstSnapshot.merge(secondSnapshot, context) + + mergedSnapshot.histograms.keys should contain(UserHistogram("histogram-for-merge")) + + mergedSnapshot.histograms(UserHistogram("histogram-for-merge")).min shouldBe (100) + mergedSnapshot.histograms(UserHistogram("histogram-for-merge")).max shouldBe (200) + mergedSnapshot.histograms(UserHistogram("histogram-for-merge")).numberOfMeasurements should be(2) + mergedSnapshot.histograms(UserHistogram("histogram-for-merge")).recordsIterator.toStream should contain allOf ( + MutableRecord(100, 1), + MutableRecord(200, 1)) + + mergedSnapshot.counters(UserCounter("counter-for-merge")).count should be(10) + mergedSnapshot.counters(UserCounter("extra-counter")).count should be(20) + + mergedSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-for-merge")).min shouldBe (0) + mergedSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-for-merge")).max shouldBe (80) + mergedSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-for-merge")).numberOfMeasurements should be(6) + mergedSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-for-merge")).recordsIterator.toStream should contain allOf ( + MutableRecord(0, 1), // min in first snapshot + MutableRecord(30, 2), // min and current in second snapshot + MutableRecord(40, 1), // current in first snapshot + MutableRecord(50, 1), // max in first snapshot + MutableRecord(80, 1)) // max in second snapshot + + mergedSnapshot.gauges(UserGauge("gauge-for-merge")).min shouldBe (50) + mergedSnapshot.gauges(UserGauge("gauge-for-merge")).max shouldBe (70) + mergedSnapshot.gauges(UserGauge("gauge-for-merge")).numberOfMeasurements should be(2) + mergedSnapshot.gauges(UserGauge("gauge-for-merge")).recordsIterator.toStream should contain allOf ( + MutableRecord(50, 1), + MutableRecord(70, 1)) + } + } +} diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala new file mode 100644 index 00000000..1a93e1f6 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala @@ -0,0 +1,55 @@ +package kamon.metric.instrument + +import java.nio.LongBuffer + +import kamon.metric.CollectionContext +import org.scalatest.{ Matchers, WordSpec } + +class CounterSpec extends WordSpec with Matchers { + + "a Counter" should { + "allow increment only operations" in new CounterFixture { + counter.increment() + counter.increment(10) + + intercept[UnsupportedOperationException] { + counter.increment(-10) + } + } + + "reset to zero when a snapshot is taken" in new CounterFixture { + counter.increment(100) + takeSnapshotFrom(counter).count should be(100) + takeSnapshotFrom(counter).count should be(0) + takeSnapshotFrom(counter).count should be(0) + + counter.increment(50) + takeSnapshotFrom(counter).count should be(50) + takeSnapshotFrom(counter).count should be(0) + } + + "produce a snapshot that can be merged with others" in new CounterFixture { + val counterA = Counter() + val counterB = Counter() + counterA.increment(100) + counterB.increment(200) + + val counterASnapshot = takeSnapshotFrom(counterA) + val counterBSnapshot = takeSnapshotFrom(counterB) + + counterASnapshot.merge(counterBSnapshot, collectionContext).count should be(300) + counterBSnapshot.merge(counterASnapshot, collectionContext).count should be(300) + } + + } + + trait CounterFixture { + val counter = Counter() + + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(1) + } + + def takeSnapshotFrom(counter: Counter): Counter.Snapshot = counter.collect(collectionContext) + } +} diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala new file mode 100644 index 00000000..b3ff3c9f --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala @@ -0,0 +1,70 @@ +package kamon.metric.instrument + +import java.util.concurrent.atomic.AtomicLong + +import akka.actor.ActorSystem +import com.typesafe.config.ConfigFactory +import kamon.metric.{ Scale, CollectionContext } +import org.scalatest.{ Matchers, WordSpecLike } +import scala.concurrent.duration._ + +class GaugeSpec extends WordSpecLike with Matchers { + val system = ActorSystem("gauge-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | flush-interval = 1 hour + | precision { + | default-gauge-precision { + | refresh-interval = 100 milliseconds + | highest-trackable-value = 999999999 + | significant-value-digits = 2 + | } + | } + |} + """.stripMargin)) + + "a Gauge" should { + "automatically record the current value using the configured refresh-interval" in { + val numberOfValuesRecorded = new AtomicLong(0) + val gauge = Gauge.fromDefaultConfig(system) { () ⇒ numberOfValuesRecorded.addAndGet(1) } + + Thread.sleep(1.second.toMillis) + numberOfValuesRecorded.get() should be(10L +- 1L) + gauge.cleanup + } + + "stop automatically recording after a call to cleanup" in { + val numberOfValuesRecorded = new AtomicLong(0) + val gauge = Gauge.fromDefaultConfig(system) { () ⇒ numberOfValuesRecorded.addAndGet(1) } + + Thread.sleep(1.second.toMillis) + gauge.cleanup + numberOfValuesRecorded.get() should be(10L +- 1L) + Thread.sleep(1.second.toMillis) + numberOfValuesRecorded.get() should be(10L +- 1L) + } + + "produce a Histogram snapshot including all the recorded values" in { + val numberOfValuesRecorded = new AtomicLong(0) + val gauge = Gauge.fromDefaultConfig(system) { () ⇒ numberOfValuesRecorded.addAndGet(1) } + + Thread.sleep(1.second.toMillis) + gauge.cleanup + val snapshot = gauge.collect(CollectionContext.default) + + snapshot.numberOfMeasurements should be(10L +- 1L) + snapshot.min should be(1) + snapshot.max should be(10L +- 1L) + } + + "not record the current value when doing a collection" in { + val numberOfValuesRecorded = new AtomicLong(0) + val gauge = Gauge(Histogram.Precision.Normal, 10000L, Scale.Unit, 1 hour, system)(() ⇒ numberOfValuesRecorded.addAndGet(1)) + + val snapshot = gauge.collect(CollectionContext.default) + + snapshot.numberOfMeasurements should be(0) + numberOfValuesRecorded.get() should be(0) + } + } +} diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala new file mode 100644 index 00000000..cefdf0f4 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala @@ -0,0 +1,130 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +import java.nio.LongBuffer + +import com.typesafe.config.ConfigFactory +import kamon.metric.CollectionContext +import org.scalatest.{ Matchers, WordSpec } + +import scala.util.Random + +class HistogramSpec extends WordSpec with Matchers { + + val histogramConfig = ConfigFactory.parseString( + """ + | + |highest-trackable-value = 100000 + |significant-value-digits = 2 + | + """.stripMargin) + + "a Histogram" should { + "allow record values within the configured range" in new HistogramFixture { + histogram.record(1000) + histogram.record(5000, count = 100) + histogram.record(10000) + } + + "fail when recording values higher than the highest trackable value" in new HistogramFixture { + intercept[IndexOutOfBoundsException] { + histogram.record(1000000) + } + } + + "reset all recorded levels to zero after a snapshot collection" in new HistogramFixture { + histogram.record(100) + histogram.record(200) + histogram.record(300) + + takeSnapshot().numberOfMeasurements should be(3) + takeSnapshot().numberOfMeasurements should be(0) + } + + "produce a snapshot" which { + "supports min, max and numberOfMeasurements operations" in new HistogramFixture { + histogram.record(100) + histogram.record(200, count = 200) + histogram.record(300) + histogram.record(900) + + val snapshot = takeSnapshot() + + snapshot.min should equal(100L +- 1L) + snapshot.max should equal(900L +- 9L) + snapshot.numberOfMeasurements should be(203) + } + + "can be merged with another snapshot" in new MultipleHistogramFixture { + val random = new Random(System.nanoTime()) + + for (repetitions ← 1 to 1000) { + // Put some values on A and Control + for (_ ← 1 to 1000) { + val newRecording = random.nextInt(100000) + controlHistogram.record(newRecording) + histogramA.record(newRecording) + } + + // Put some values on B and Control + for (_ ← 1 to 2000) { + val newRecording = random.nextInt(100000) + controlHistogram.record(newRecording) + histogramB.record(newRecording) + } + + val controlSnapshot = takeSnapshotFrom(controlHistogram) + val histogramASnapshot = takeSnapshotFrom(histogramA) + val histogramBSnapshot = takeSnapshotFrom(histogramB) + + assertEquals(controlSnapshot, histogramASnapshot.merge(histogramBSnapshot, collectionContext)) + assertEquals(controlSnapshot, histogramBSnapshot.merge(histogramASnapshot, collectionContext)) + } + } + } + } + + trait HistogramFixture { + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) + } + + val histogram = Histogram.fromConfig(histogramConfig) + + def takeSnapshot(): Histogram.Snapshot = histogram.collect(collectionContext) + } + + trait MultipleHistogramFixture { + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) + } + + val controlHistogram = Histogram.fromConfig(histogramConfig) + val histogramA = Histogram.fromConfig(histogramConfig) + val histogramB = Histogram.fromConfig(histogramConfig) + + def takeSnapshotFrom(histogram: Histogram): Histogram.Snapshot = histogram.collect(collectionContext) + + def assertEquals(left: Histogram.Snapshot, right: Histogram.Snapshot): Unit = { + left.numberOfMeasurements should equal(right.numberOfMeasurements) + left.min should equal(right.min) + left.max should equal(right.max) + left.recordsIterator.toStream should contain theSameElementsAs (right.recordsIterator.toStream) + } + } +} diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala new file mode 100644 index 00000000..cb03664c --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala @@ -0,0 +1,108 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +import java.nio.LongBuffer + +import akka.actor.ActorSystem +import com.typesafe.config.ConfigFactory +import kamon.metric.CollectionContext +import kamon.metric.instrument.Histogram.MutableRecord +import org.scalatest.{ Matchers, WordSpecLike } + +class MinMaxCounterSpec extends WordSpecLike with Matchers { + val system = ActorSystem("min-max-counter-spec") + val minMaxCounterConfig = ConfigFactory.parseString( + """ + |refresh-interval = 1 hour + |highest-trackable-value = 1000 + |significant-value-digits = 2 + """.stripMargin) + + "the MinMaxCounter" should { + "track ascending tendencies" in new MinMaxCounterFixture { + mmCounter.increment() + mmCounter.increment(3) + mmCounter.increment() + + val snapshot = collectCounterSnapshot() + + snapshot.min should be(0) + snapshot.max should be(5) + snapshot.recordsIterator.toStream should contain allOf ( + MutableRecord(0, 1), // min + MutableRecord(5, 2)) // max and current + } + + "track descending tendencies" in new MinMaxCounterFixture { + mmCounter.increment(5) + mmCounter.decrement() + mmCounter.decrement(3) + mmCounter.decrement() + + val snapshot = collectCounterSnapshot() + + snapshot.min should be(0) + snapshot.max should be(5) + snapshot.recordsIterator.toStream should contain allOf ( + MutableRecord(0, 2), // min and current + MutableRecord(5, 1)) // max + } + + "reset the min and max to the current value after taking a snapshot" in new MinMaxCounterFixture { + mmCounter.increment(5) + mmCounter.decrement(3) + + val firstSnapshot = collectCounterSnapshot() + + firstSnapshot.min should be(0) + firstSnapshot.max should be(5) + firstSnapshot.recordsIterator.toStream should contain allOf ( + MutableRecord(0, 1), // min + MutableRecord(2, 1), // current + MutableRecord(5, 1)) // max + + val secondSnapshot = collectCounterSnapshot() + + secondSnapshot.min should be(2) + secondSnapshot.max should be(2) + secondSnapshot.recordsIterator.toStream should contain( + MutableRecord(2, 3)) // min, max and current + } + + "report zero as the min and current values if they current value fell bellow zero" in new MinMaxCounterFixture { + mmCounter.decrement(3) + + val snapshot = collectCounterSnapshot() + + snapshot.min should be(0) + snapshot.max should be(0) + snapshot.recordsIterator.toStream should contain( + MutableRecord(0, 3)) // min, max and current (even while current really is -3 + } + } + + trait MinMaxCounterFixture { + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(64) + } + + val mmCounter = MinMaxCounter.fromConfig(minMaxCounterConfig, system).asInstanceOf[PaddedMinMaxCounter] + mmCounter.cleanup // cancel the refresh schedule + + def collectCounterSnapshot(): Histogram.Snapshot = mmCounter.collect(collectionContext) + } +} diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala deleted file mode 100644 index 645ca96a..00000000 --- a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala +++ /dev/null @@ -1,172 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import org.scalatest.{ WordSpecLike, Matchers } -import akka.testkit.{ TestProbe, TestKitBase } -import akka.actor.{ ActorRef, Actor, Props, ActorSystem } -import com.typesafe.config.ConfigFactory -import scala.concurrent.duration._ -import kamon.Kamon -import kamon.metrics.Subscriptions.TickMetricSnapshot -import kamon.metrics.ActorMetrics.ActorMetricSnapshot -import kamon.metrics.MetricSnapshot.Measurement - -class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers { - implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | filters = [ - | { - | actor { - | includes = [ "user/tracked-*" ] - | excludes = [ "user/tracked-explicitly-excluded"] - | } - | } - | ] - |} - """.stripMargin)) - - "the Kamon actor metrics" should { - "respect the configured include and exclude filters" in new DelayableActorFixture { - val tracked = system.actorOf(Props[DelayableActor], "tracked-actor") - val nonTracked = system.actorOf(Props[DelayableActor], "non-tracked-actor") - val trackedExplicitlyExcluded = system.actorOf(Props[DelayableActor], "tracked-explicitly-excluded") - - Kamon(Metrics).subscribe(ActorMetrics, "*", testActor, permanently = true) - expectMsgType[TickMetricSnapshot] - - tracked ! Discard - nonTracked ! Discard - trackedExplicitlyExcluded ! Discard - - within(2 seconds) { - val tickSnapshot = expectMsgType[TickMetricSnapshot] - tickSnapshot.metrics.keys should contain(ActorMetrics("user/tracked-actor")) - tickSnapshot.metrics.keys should not contain (ActorMetrics("user/non-tracked-actor")) - tickSnapshot.metrics.keys should not contain (ActorMetrics("user/tracked-explicitly-excluded")) - } - } - - "record mailbox-size, processing-time and time-in-mailbox metrics under regular conditions" in new DelayableActorFixture { - val (delayable, metricsListener) = delayableActor("tracked-normal-conditions") - - for (_ ← 1 to 10) { - delayable ! Discard - } - - val actorMetrics = expectActorMetrics("user/tracked-normal-conditions", metricsListener, 3 seconds) - actorMetrics.mailboxSize.max should be <= 10L - actorMetrics.processingTime.numberOfMeasurements should be(10L) - actorMetrics.timeInMailbox.numberOfMeasurements should be(10L) - } - - "keep a correct mailbox-size even if the actor is blocked processing a message" in new DelayableActorFixture { - val (delayable, metricsListener) = delayableActor("tracked-mailbox-size-queueing-up") - - delayable ! Delay(2500 milliseconds) - for (_ ← 1 to 9) { - delayable ! Discard - } - - // let the first snapshot pass - metricsListener.expectMsgType[TickMetricSnapshot] - - // process the tick in which the actor is stalled. - val stalledTickMetrics = expectActorMetrics("user/tracked-mailbox-size-queueing-up", metricsListener, 2 seconds) - stalledTickMetrics.mailboxSize.numberOfMeasurements should equal(30) - // only the automatic last-value recording should be taken, and includes the message being currently processed. - stalledTickMetrics.mailboxSize.measurements should contain only (Measurement(10, 30)) - stalledTickMetrics.mailboxSize.min should equal(10) - stalledTickMetrics.mailboxSize.max should equal(10) - stalledTickMetrics.processingTime.numberOfMeasurements should be(0L) - stalledTickMetrics.timeInMailbox.numberOfMeasurements should be(0L) - - // process the tick after the actor is unblocked. - val afterStallTickMetrics = expectActorMetrics("user/tracked-mailbox-size-queueing-up", metricsListener, 2 seconds) - afterStallTickMetrics.processingTime.numberOfMeasurements should be(10L) - afterStallTickMetrics.timeInMailbox.numberOfMeasurements should be(10L) - afterStallTickMetrics.processingTime.max should be(2500.milliseconds.toNanos +- 100.milliseconds.toNanos) - afterStallTickMetrics.timeInMailbox.max should be(2500.milliseconds.toNanos +- 100.milliseconds.toNanos) - } - - "track the number of errors" in new ErrorActorFixture { - val (error, metricsListener) = failedActor("tracked-errors") - - for (_ ← 1 to 5) { - error ! Error - } - - val actorMetrics = expectActorMetrics("user/tracked-errors", metricsListener, 3 seconds) - actorMetrics.errorCounter.numberOfMeasurements should be(5L) - } - } - - def expectActorMetrics(actorPath: String, listener: TestProbe, waitTime: FiniteDuration): ActorMetricSnapshot = { - val tickSnapshot = within(waitTime) { - listener.expectMsgType[TickMetricSnapshot] - } - val actorMetricsOption = tickSnapshot.metrics.get(ActorMetrics(actorPath)) - actorMetricsOption should not be empty - actorMetricsOption.get.asInstanceOf[ActorMetricSnapshot] - } - - trait DelayableActorFixture { - def delayableActor(name: String): (ActorRef, TestProbe) = { - val actor = system.actorOf(Props[DelayableActor], name) - val metricsListener = TestProbe() - - Kamon(Metrics).subscribe(ActorMetrics, "user/" + name, metricsListener.ref, permanently = true) - // Wait for one empty snapshot before proceeding to the test. - metricsListener.expectMsgType[TickMetricSnapshot] - - (actor, metricsListener) - } - } - - trait ErrorActorFixture { - def failedActor(name: String): (ActorRef, TestProbe) = { - val actor = system.actorOf(Props[FailedActor], name) - val metricsListener = TestProbe() - - Kamon(Metrics).subscribe(ActorMetrics, "user/" + name, metricsListener.ref, permanently = true) - // Wait for one empty snapshot before proceeding to the test. - metricsListener.expectMsgType[TickMetricSnapshot] - - (actor, metricsListener) - } - } -} - -class DelayableActor extends Actor { - def receive = { - case Delay(time) ⇒ Thread.sleep(time.toMillis) - case Discard ⇒ - } -} - -class FailedActor extends Actor { - def receive = { - case Error ⇒ 1 / 0 - case Discard ⇒ - } -} - -case object Discard - -case class Delay(time: FiniteDuration) - -case class Error() diff --git a/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala b/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala deleted file mode 100644 index 1e072f71..00000000 --- a/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import akka.testkit.TestKitBase -import org.scalatest.{ Matchers, WordSpecLike } -import akka.actor.ActorSystem -import scala.concurrent.duration._ -import com.typesafe.config.ConfigFactory -import kamon.Kamon -import kamon.metrics.Subscriptions.TickMetricSnapshot -import kamon.metrics.MetricSnapshot.Measurement - -class CustomMetricSpec extends TestKitBase with WordSpecLike with Matchers { - implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | filters = [ - | { - | custom-metric { - | includes = [ "test/*" ] - | excludes = [ ] - | } - | } - | ] - |} - """.stripMargin)) - - "the Kamon custom metrics support" should { - "allow registering a custom metric with the Metrics extension" in { - val recorder = Kamon(Metrics).register(CustomMetric("test/sample-counter"), CustomMetric.histogram(100, 2, Scale.Unit)) - - recorder should be('defined) - } - - "allow subscriptions to custom metrics using the default subscription protocol" in { - val recorder = Kamon(Metrics).register(CustomMetric("test/sample-counter"), CustomMetric.histogram(100, 2, Scale.Unit)) - - recorder.map { r ⇒ - r.record(100) - r.record(15) - r.record(0) - r.record(50) - } - - Kamon(Metrics).subscribe(CustomMetric, "test/sample-counter", testActor) - - val recordedValues = within(5 seconds) { - val snapshot = expectMsgType[TickMetricSnapshot] - snapshot.metrics(CustomMetric("test/sample-counter")).metrics(CustomMetric.RecordedValues) - } - - recordedValues.min should equal(0) - recordedValues.max should equal(100) - recordedValues.numberOfMeasurements should equal(4) - recordedValues.measurements should contain allOf ( - Measurement(0, 1), - Measurement(15, 1), - Measurement(50, 1), - Measurement(100, 1)) - } - } - -} diff --git a/kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala deleted file mode 100644 index 2a9cb6b4..00000000 --- a/kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import org.scalatest.{ WordSpecLike, Matchers } -import akka.testkit.{ TestProbe, TestKitBase } -import akka.actor.{ ActorRef, Props, ActorSystem } -import com.typesafe.config.ConfigFactory -import scala.concurrent.duration._ -import kamon.Kamon -import kamon.metrics.Subscriptions.TickMetricSnapshot -import kamon.metrics.DispatcherMetrics.DispatcherMetricSnapshot - -class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers { - implicit lazy val system: ActorSystem = ActorSystem("dispatcher-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | filters = [ - | { - | dispatcher { - | includes = ["*"] - | excludes = ["dispatcher-explicitly-excluded"] - | } - | } - | ] - |} - | - |dispatcher-explicitly-excluded { - | type = "Dispatcher" - | executor = "fork-join-executor" - |} - | - |tracked-dispatcher { - | type = "Dispatcher" - | executor = "thread-pool-executor" - |} - | - """.stripMargin)) - - "the Kamon dispatcher metrics" should { - "respect the configured include and exclude filters" in { - system.actorOf(Props[DelayableActor].withDispatcher("tracked-dispatcher"), "actor-with-tracked-dispatcher") - system.actorOf(Props[DelayableActor].withDispatcher("dispatcher-explicitly-excluded"), "actor-with-excluded-dispatcher") - - Kamon(Metrics).subscribe(DispatcherMetrics, "*", testActor, permanently = true) - expectMsgType[TickMetricSnapshot] - - within(2 seconds) { - val tickSnapshot = expectMsgType[TickMetricSnapshot] - tickSnapshot.metrics.keys should contain(DispatcherMetrics("tracked-dispatcher")) - tickSnapshot.metrics.keys should not contain (DispatcherMetrics("dispatcher-explicitly-excluded")) - } - } - - "record maximumPoolSize, runningThreadCount, queueTaskCount, poolSize metrics" in new DelayableActorFixture { - val (delayable, metricsListener) = delayableActor("worker-actor", "tracked-dispatcher") - - for (_ ← 1 to 100) { - delayable ! Discard - } - - val dispatcherMetrics = expectDispatcherMetrics("tracked-dispatcher", metricsListener, 3 seconds) - dispatcherMetrics.maximumPoolSize.max should be <= 64L //fail in travis - dispatcherMetrics.poolSize.max should be <= 22L //fail in travis - dispatcherMetrics.queueTaskCount.max should be(0L) - dispatcherMetrics.runningThreadCount.max should be(0L) - } - - } - - def expectDispatcherMetrics(dispatcherId: String, listener: TestProbe, waitTime: FiniteDuration): DispatcherMetricSnapshot = { - val tickSnapshot = within(waitTime) { - listener.expectMsgType[TickMetricSnapshot] - } - val dispatcherMetricsOption = tickSnapshot.metrics.get(DispatcherMetrics(dispatcherId)) - dispatcherMetricsOption should not be empty - dispatcherMetricsOption.get.asInstanceOf[DispatcherMetricSnapshot] - } - - trait DelayableActorFixture { - def delayableActor(name: String, dispatcher: String): (ActorRef, TestProbe) = { - val actor = system.actorOf(Props[DelayableActor].withDispatcher(dispatcher), name) - val metricsListener = TestProbe() - - Kamon(Metrics).subscribe(DispatcherMetrics, "*", metricsListener.ref, permanently = true) - // Wait for one empty snapshot before proceeding to the test. - metricsListener.expectMsgType[TickMetricSnapshot] - - (actor, metricsListener) - } - } -} diff --git a/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala b/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala deleted file mode 100644 index 4d6ebc49..00000000 --- a/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import org.scalatest.{ Matchers, WordSpec } -import kamon.metrics.MetricSnapshot.Measurement - -class MetricSnapshotSpec extends WordSpec with Matchers { - - "a metric snapshot" should { - "support a max operation" in new SnapshotFixtures { - snapshotA.max should be(17) - snapshotB.max should be(10) - snapshotC.max should be(1) - } - - "support a min operation" in new SnapshotFixtures { - snapshotA.min should be(1) - snapshotB.min should be(2) - snapshotC.min should be(1) - } - - "be able to merge with other snapshot" in new SnapshotFixtures { - val merged = snapshotA.merge(snapshotB).merge(snapshotC) - - merged.min should be(1) - merged.max should be(17) - merged.numberOfMeasurements should be(300) - merged.measurements.map(_.value) should contain inOrderOnly (1, 2, 4, 5, 7, 10, 17) - } - - "be able to merge with empty snapshots" in new SnapshotFixtures { - snapshotA.merge(emptySnapshot) should be(snapshotA) - emptySnapshot.merge(snapshotA).merge(emptySnapshot) should be(snapshotA) - snapshotC.merge(emptySnapshot) should be(snapshotC) - } - - } - - trait SnapshotFixtures { - val emptySnapshot = MetricSnapshot(InstrumentTypes.Histogram, 0, Scale.Unit, Vector.empty) - - val snapshotA = MetricSnapshot(InstrumentTypes.Histogram, 100, Scale.Unit, Vector( - Measurement(1, 3), - Measurement(2, 15), - Measurement(5, 68), - Measurement(7, 13), - Measurement(17, 1))) - - val snapshotB = MetricSnapshot(InstrumentTypes.Histogram, 100, Scale.Unit, Vector( - Measurement(2, 6), - Measurement(4, 48), - Measurement(5, 39), - Measurement(10, 7))) - - val snapshotC = MetricSnapshot(InstrumentTypes.Counter, 100, Scale.Unit, Vector(Measurement(1, 100))) - } -} diff --git a/kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala b/kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala deleted file mode 100644 index d0a0c707..00000000 --- a/kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import org.scalatest.{ Matchers, WordSpecLike } -import akka.testkit.TestKit -import akka.actor.ActorSystem -import scala.concurrent.duration._ -import kamon.metrics.Subscriptions.TickMetricSnapshot -import kamon.metrics.MetricSnapshot.Measurement - -class TickMetricSnapshotBufferSpec extends TestKit(ActorSystem("tick-metric-snapshot-buffer")) with WordSpecLike with Matchers { - - "the TickMetricSnapshotBuffer" should { - "merge TickMetricSnapshots received until the flush timeout is reached and fix the from/to fields" in new SnapshotFixtures { - val buffer = system.actorOf(TickMetricSnapshotBuffer.props(3 seconds, testActor)) - - buffer ! firstEmpty - buffer ! secondEmpty - buffer ! thirdEmpty - - within(2 seconds)(expectNoMsg()) - val mergedSnapshot = expectMsgType[TickMetricSnapshot] - - mergedSnapshot.from should equal(1000) - mergedSnapshot.to should equal(4000) - mergedSnapshot.metrics should be('empty) - } - - "merge empty and non-empty snapshots" in new SnapshotFixtures { - val buffer = system.actorOf(TickMetricSnapshotBuffer.props(3 seconds, testActor)) - - buffer ! firstNonEmpty - buffer ! secondNonEmpty - buffer ! thirdEmpty - - within(2 seconds)(expectNoMsg()) - val mergedSnapshot = expectMsgType[TickMetricSnapshot] - - mergedSnapshot.from should equal(1000) - mergedSnapshot.to should equal(4000) - mergedSnapshot.metrics should not be ('empty) - - val testMetricSnapshot = mergedSnapshot.metrics(CustomMetric("test-metric")).metrics(CustomMetric.RecordedValues) - testMetricSnapshot.min should equal(1) - testMetricSnapshot.max should equal(10) - testMetricSnapshot.numberOfMeasurements should equal(35) - testMetricSnapshot.measurements should contain allOf (Measurement(1, 10), Measurement(4, 9), Measurement(10, 16)) - - } - } - - trait SnapshotFixtures { - val firstEmpty = TickMetricSnapshot(1000, 2000, Map.empty) - val secondEmpty = TickMetricSnapshot(2000, 3000, Map.empty) - val thirdEmpty = TickMetricSnapshot(3000, 4000, Map.empty) - - val firstNonEmpty = TickMetricSnapshot(1000, 2000, - Map((CustomMetric("test-metric") -> SimpleGroupSnapshot(Map(CustomMetric.RecordedValues -> MetricSnapshot(InstrumentTypes.Histogram, 20, Scale.Unit, Vector(Measurement(1, 10), Measurement(10, 10)))))))) - - val secondNonEmpty = TickMetricSnapshot(1000, 2000, - Map((CustomMetric("test-metric") -> SimpleGroupSnapshot(Map(CustomMetric.RecordedValues -> MetricSnapshot(InstrumentTypes.Histogram, 15, Scale.Unit, Vector(Measurement(4, 9), Measurement(10, 6)))))))) - - } - - case class SimpleGroupSnapshot(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot -} diff --git a/kamon-core/src/test/scala/kamon/metrics/instrument/MinMaxCounterSpec.scala b/kamon-core/src/test/scala/kamon/metrics/instrument/MinMaxCounterSpec.scala deleted file mode 100644 index 14f1573f..00000000 --- a/kamon-core/src/test/scala/kamon/metrics/instrument/MinMaxCounterSpec.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.metrics.instrument - -import org.scalatest.{ Matchers, WordSpecLike } -import kamon.metrics.instruments.MinMaxCounter -import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement - -class MinMaxCounterSpec extends WordSpecLike with Matchers { - - "the MinMaxCounter" should { - "increment" in { - val counter = MinMaxCounter() - - counter.increment() - counter.increment() - counter.increment() - counter.increment() - counter.increment() - - val CounterMeasurement(_, _, current) = counter.collect() - - current should be(5) - } - - "decrement" in { - val counter = MinMaxCounter() - counter.increment(5L) - - counter.decrement() - counter.decrement() - counter.decrement() - counter.decrement() - counter.decrement() - - val CounterMeasurement(_, _, current) = counter.collect() - - current should be(0) - } - - "reset the min and max with the sum value when the collect method is called" in { - val counter = MinMaxCounter() - - counter.increment(10) - counter.increment(20) - counter.increment(30) - counter.increment(40) - counter.increment(50) - - counter.collect() //only for check the last value after reset min max - - val CounterMeasurement(min, max, current) = counter.collect() - - min should be(current) - max should be(current) - current should be(150) - } - } - - "track the min value" in { - val counter = MinMaxCounter() - - counter.increment(10) - counter.increment(20) - counter.increment(30) - counter.increment(40) - counter.increment(50) - - val CounterMeasurement(min, _, _) = counter.collect() - - min should be(0) - - counter.increment(50) - - val CounterMeasurement(minAfterCollectAndAddSomeValues, _, _) = counter.collect() - - minAfterCollectAndAddSomeValues should be(150) - } - - "track the max value" in { - val counter = MinMaxCounter() - counter.increment(10) - counter.increment(20) - counter.increment(30) - counter.increment(40) - counter.increment(50) - - val CounterMeasurement(_, max, _) = counter.collect() - - max should be(150) - - counter.increment(200) - - val CounterMeasurement(_, maxAfterCollectAndAddSomeValues, _) = counter.collect() - - maxAfterCollectAndAddSomeValues should be(350) - } -} diff --git a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala new file mode 100644 index 00000000..4d0049f1 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala @@ -0,0 +1,95 @@ +package kamon.trace + +import akka.actor.ActorSystem +import akka.testkit.{ ImplicitSender, TestKitBase } +import com.typesafe.config.ConfigFactory +import kamon.trace.TraceContext.SegmentIdentity +import org.scalatest.{ Matchers, WordSpecLike } + +class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | tick-interval = 1 hour + | filters = [ + | { + | trace { + | includes = [ "*" ] + | excludes = [ "non-tracked-trace"] + | } + | } + | ] + | precision { + | default-histogram-precision { + | highest-trackable-value = 3600000000000 + | significant-value-digits = 2 + | } + | + | default-min-max-counter-precision { + | refresh-interval = 1 second + | highest-trackable-value = 999999999 + | significant-value-digits = 2 + | } + | } + |} + """.stripMargin)) + + "the TraceRecorder api" should { + "allow starting a trace within a specified block of code, and only within that block of code" in { + val createdContext = TraceRecorder.withNewTraceContext("start-context") { + TraceRecorder.currentContext should not be empty + TraceRecorder.currentContext.get + } + + TraceRecorder.currentContext shouldBe empty + createdContext.name shouldBe ("start-context") + } + + "allow starting a trace within a specified block of code, providing a trace-token and only within that block of code" in { + val createdContext = TraceRecorder.withNewTraceContext("start-context-with-token", Some("token-1")) { + TraceRecorder.currentContext should not be empty + TraceRecorder.currentContext.get + } + + TraceRecorder.currentContext shouldBe empty + createdContext.name shouldBe ("start-context-with-token") + createdContext.token should be("token-1") + } + + "allow providing a TraceContext and make it available within a block of code" in { + val createdContext = TraceRecorder.withNewTraceContext("manually-provided-trace-context") { TraceRecorder.currentContext } + + TraceRecorder.currentContext shouldBe empty + TraceRecorder.withTraceContext(createdContext) { + TraceRecorder.currentContext should be(createdContext) + } + + TraceRecorder.currentContext shouldBe empty + } + + "allow renaming a trace" in { + val createdContext = TraceRecorder.withNewTraceContext("trace-before-rename") { + TraceRecorder.rename("renamed-trace") + TraceRecorder.currentContext.get + } + + TraceRecorder.currentContext shouldBe empty + createdContext.name shouldBe ("renamed-trace") + } + + "allow creating a segment within a trace" in { + val createdContext = TraceRecorder.withNewTraceContext("trace-with-segments") { + val segmentHandle = TraceRecorder.startSegment(TraceManipulationTestSegment("segment-1")) + + TraceRecorder.currentContext.get + } + + TraceRecorder.currentContext shouldBe empty + createdContext.name shouldBe ("trace-with-segments") + + } + } + + case class TraceManipulationTestSegment(name: String) extends SegmentIdentity + +} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala deleted file mode 100644 index 81fd9cbc..00000000 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.trace.instrumentation - -import akka.testkit.TestKit -import org.scalatest.{ Inspectors, Matchers, WordSpecLike } -import akka.actor.{ Props, ActorLogging, Actor, ActorSystem } -import akka.event.Logging.LogEvent -import kamon.trace.{ TraceContextAware, TraceRecorder } - -class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with WordSpecLike with Matchers with Inspectors { - - "the ActorLogging instrumentation" should { - "attach the TraceContext (if available) to log events" in { - val loggerActor = system.actorOf(Props[LoggerActor]) - system.eventStream.subscribe(testActor, classOf[LogEvent]) - - val testTraceContext = TraceRecorder.withNewTraceContext("logging") { - loggerActor ! "info" - TraceRecorder.currentContext - } - - fishForMessage() { - case event: LogEvent if event.message.toString contains "TraceContext =>" ⇒ - val ctxInEvent = event.asInstanceOf[TraceContextAware].traceContext - ctxInEvent === testTraceContext - - case event: LogEvent ⇒ false - } - } - } -} - -class LoggerActor extends Actor with ActorLogging { - def receive = { - case "info" ⇒ log.info("TraceContext => {}", TraceRecorder.currentContext) - } -} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala deleted file mode 100644 index 4e62c9f7..00000000 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.trace.instrumentation - -import org.scalatest.WordSpecLike -import akka.actor.{ Actor, Props, ActorSystem } - -import akka.testkit.{ ImplicitSender, TestKit } -import kamon.trace.TraceRecorder -import akka.pattern.{ pipe, ask } -import akka.util.Timeout -import scala.concurrent.duration._ -import akka.routing.{ RoundRobinPool } - -class ActorMessagePassingTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender { - implicit val executionContext = system.dispatcher - - "the message passing instrumentation" should { - "propagate the TraceContext using bang" in new EchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("bang-reply") { - ctxEchoActor ! "test" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "propagate the TraceContext using tell" in new EchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("tell-reply") { - ctxEchoActor.tell("test", testActor) - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "propagate the TraceContext using ask" in new EchoActorFixture { - implicit val timeout = Timeout(1 seconds) - val testTraceContext = TraceRecorder.withNewTraceContext("ask-reply") { - // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. - (ctxEchoActor ? "test") pipeTo (testActor) - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "propagate the TraceContext to actors behind a router" in new RoutedEchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { - ctxEchoActor ! "test" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - } - - trait EchoActorFixture { - val ctxEchoActor = system.actorOf(Props[TraceContextEcho]) - } - - trait RoutedEchoActorFixture extends EchoActorFixture { - override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinPool(nrOfInstances = 1))) - } -} - -class TraceContextEcho extends Actor { - def receive = { - case msg: String ⇒ sender ! TraceRecorder.currentContext - } -} - diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala deleted file mode 100644 index ed239b38..00000000 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala +++ /dev/null @@ -1,169 +0,0 @@ -package kamon.trace.instrumentation - -import akka.testkit.{ ImplicitSender, TestKit } -import akka.actor._ -import org.scalatest.WordSpecLike -import kamon.trace.TraceRecorder -import scala.util.control.NonFatal -import akka.actor.SupervisorStrategy.{ Escalate, Stop, Restart, Resume } -import scala.concurrent.duration._ - -class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender { - implicit val executionContext = system.dispatcher - - "the system message passing instrumentation" should { - "keep the TraceContext while processing the Create message in top level actors" in { - val testTraceContext = TraceRecorder.withNewTraceContext("creating-top-level-actor") { - system.actorOf(Props(new Actor { - testActor ! TraceRecorder.currentContext - def receive: Actor.Receive = { case any ⇒ } - })) - - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "keep the TraceContext while processing the Create message in non top level actors" in { - val testTraceContext = TraceRecorder.withNewTraceContext("creating-non-top-level-actor") { - system.actorOf(Props(new Actor { - def receive: Actor.Receive = { - case any ⇒ - context.actorOf(Props(new Actor { - testActor ! TraceRecorder.currentContext - def receive: Actor.Receive = { case any ⇒ } - })) - } - })) ! "any" - - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "keep the TraceContext in the supervision cycle" when { - "the actor is resumed" in { - val supervisor = supervisorWithDirective(Resume) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-resume") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - - // Ensure we didn't tie the actor with the context - supervisor ! "context" - expectMsg(None) - } - - "the actor is restarted" in { - val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-restart") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - expectMsg(testTraceContext) // From the preRestart hook - expectMsg(testTraceContext) // From the postRestart hook - - // Ensure we didn't tie the actor with the context - supervisor ! "context" - expectMsg(None) - } - - "the actor is stopped" in { - val supervisor = supervisorWithDirective(Stop, sendPostStop = true) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-stop") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - expectMsg(testTraceContext) // From the postStop hook - expectNoMsg(1 second) - } - - "the failure is escalated" in { - val supervisor = supervisorWithDirective(Escalate, sendPostStop = true) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-escalate") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - expectMsg(testTraceContext) // From the grandparent executing the supervision strategy - expectMsg(testTraceContext) // From the postStop hook in the child - expectMsg(testTraceContext) // From the postStop hook in the parent - expectNoMsg(1 second) - } - } - } - - def supervisorWithDirective(directive: SupervisorStrategy.Directive, sendPreRestart: Boolean = false, sendPostRestart: Boolean = false, - sendPostStop: Boolean = false, sendPreStart: Boolean = false): ActorRef = { - class GrandParent extends Actor { - val child = context.actorOf(Props(new Parent)) - - override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; Stop - } - - def receive = { - case any ⇒ child forward any - } - } - - class Parent extends Actor { - val child = context.actorOf(Props(new Child)) - - override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; directive - } - - def receive: Actor.Receive = { - case any ⇒ child forward any - } - - override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceRecorder.currentContext - super.postStop() - } - } - - class Child extends Actor { - def receive = { - case "fail" ⇒ 1 / 0 - case "context" ⇒ sender ! TraceRecorder.currentContext - } - - override def preRestart(reason: Throwable, message: Option[Any]): Unit = { - if (sendPreRestart) testActor ! TraceRecorder.currentContext - super.preRestart(reason, message) - } - - override def postRestart(reason: Throwable): Unit = { - if (sendPostRestart) testActor ! TraceRecorder.currentContext - super.postRestart(reason) - } - - override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceRecorder.currentContext - super.postStop() - } - - override def preStart(): Unit = { - if (sendPreStart) testActor ! TraceRecorder.currentContext - super.preStart() - } - } - - system.actorOf(Props(new GrandParent)) - } -} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala deleted file mode 100644 index fb886de6..00000000 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace.instrumentation - -import akka.testkit.TestKitBase -import akka.actor.{ Props, Actor, ActorSystem } -import org.scalatest.{ Matchers, WordSpecLike } -import akka.event.Logging.Warning -import scala.concurrent.duration._ -import akka.pattern.ask -import akka.util.Timeout -import kamon.trace.{ TraceContextAware, TraceRecorder } -import com.typesafe.config.ConfigFactory - -class AskPatternTracingSpec extends TestKitBase with WordSpecLike with Matchers { - implicit lazy val system: ActorSystem = ActorSystem("ask-pattern-tracing-spec", ConfigFactory.parseString( - """ - |kamon { - | trace { - | ask-pattern-tracing = on - | } - |} - """.stripMargin)) - - "the AskPatternTracing" should { - "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in { - implicit val ec = system.dispatcher - implicit val timeout = Timeout(10 milliseconds) - val noReply = system.actorOf(Props[NoReply]) - system.eventStream.subscribe(testActor, classOf[Warning]) - - val testTraceContext = TraceRecorder.withNewTraceContext("ask-timeout-warning") { - noReply ? "hello" - TraceRecorder.currentContext - } - - val warn = expectMsgPF() { - case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn - } - val capturedCtx = warn.asInstanceOf[TraceContextAware].traceContext - - capturedCtx should be('defined) - capturedCtx should equal(testTraceContext) - } - } -} - -class NoReply extends Actor { - def receive = { - case any ⇒ - } -} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala deleted file mode 100644 index b1765fd8..00000000 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.trace.instrumentation - -import scala.concurrent.{ ExecutionContext, Future } -import org.scalatest.{ Matchers, OptionValues, WordSpecLike } -import org.scalatest.concurrent.{ ScalaFutures, PatienceConfiguration } -import kamon.trace.TraceRecorder -import akka.testkit.TestKit -import akka.actor.ActorSystem - -class FutureTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with Matchers - with ScalaFutures with PatienceConfiguration with OptionValues { - - implicit val execContext = system.dispatcher - - "a Future created with FutureTracing" should { - "capture the TraceContext available when created" which { - "must be available when executing the future's body" in { - - val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { - val future = Future(TraceRecorder.currentContext) - - (future, TraceRecorder.currentContext) - } - - whenReady(future)(ctxInFuture ⇒ - ctxInFuture should equal(testTraceContext)) - } - - "must be available when executing callbacks on the future" in { - - val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { - val future = Future("Hello Kamon!") - // The TraceContext is expected to be available during all intermediate processing. - .map(_.length) - .flatMap(len ⇒ Future(len.toString)) - .map(s ⇒ TraceRecorder.currentContext) - - (future, TraceRecorder.currentContext) - } - - whenReady(future)(ctxInFuture ⇒ - ctxInFuture should equal(testTraceContext)) - } - } - } -} - diff --git a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala index 15d5d3fe..b4358ce7 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala @@ -18,7 +18,7 @@ package kamon.datadog import akka.actor._ import kamon.Kamon -import kamon.metrics._ +import kamon.metric._ import scala.concurrent.duration._ import scala.collection.JavaConverters._ import com.typesafe.config.Config diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala index 028e9608..17e19d0b 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala @@ -20,11 +20,10 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor } import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString -import kamon.metrics.Subscriptions.TickMetricSnapshot -import kamon.metrics.MetricSnapshot.Measurement -import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } +import kamon.metric.Subscriptions.TickMetricSnapshot import java.text.{ DecimalFormatSymbols, DecimalFormat } -import kamon.metrics.{ MetricIdentity, MetricGroupIdentity } +import kamon.metric.instrument.{ Counter, Histogram } +import kamon.metric.{ MetricIdentity, MetricGroupIdentity } import java.util.Locale class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider { @@ -50,7 +49,7 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long } def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) + val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) for { (groupIdentity, groupSnapshot) ← tick.metrics @@ -59,33 +58,35 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long val key = buildMetricName(groupIdentity, metricIdentity) - for (measurement ← metricSnapshot.measurements) { - val measurementData = formatMeasurement(groupIdentity, metricIdentity, measurement, metricSnapshot.instrumentType) - dataBuilder.appendMeasurement(key, measurementData) + metricSnapshot match { + case hs: Histogram.Snapshot ⇒ + hs.recordsIterator.foreach { record ⇒ + val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeStatsDTimer(record.level, record.count)) + packetBuilder.appendMeasurement(key, measurementData) + + } + + case cs: Counter.Snapshot ⇒ + val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeStatsDCounter(cs.count)) + packetBuilder.appendMeasurement(key, measurementData) } } - dataBuilder.flush() + packetBuilder.flush() } - def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurement: Measurement, - instrumentType: InstrumentType): String = { - - StringBuilder.newBuilder.append(buildMeasurementData(measurement, instrumentType)) + def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurementData: String): String = + StringBuilder.newBuilder + .append(measurementData) .append(buildIdentificationTag(groupIdentity, metricIdentity)) .result() - } - def buildMeasurementData(measurement: Measurement, instrumentType: InstrumentType): String = { - def dataDogDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String = - s"$value|$metricType${(if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")}" - - instrumentType match { - case Histogram ⇒ dataDogDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) - case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g") - case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c") - } + def encodeStatsDTimer(level: Long, count: Long): String = { + val samplingRate: Double = 1D / count + level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") } + def encodeStatsDCounter(count: Long): String = count.toString + "|c" + def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = s"$appName.${groupIdentity.category.name}.${metricIdentity.name}" diff --git a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala index 6a7191a1..cb82c362 100644 --- a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala +++ b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala @@ -16,14 +16,14 @@ package kamon.datadog -import akka.testkit.{TestKitBase, TestProbe} -import akka.actor.{Props, ActorRef, ActorSystem} -import kamon.metrics.instruments.CounterRecorder -import org.scalatest.{Matchers, WordSpecLike} -import kamon.metrics._ +import akka.testkit.{ TestKitBase, TestProbe } +import akka.actor.{ Props, ActorRef, ActorSystem } +import kamon.metric.instrument.Histogram.Precision +import kamon.metric.instrument.{ Counter, Histogram, HdrHistogram, LongAdderCounter } +import org.scalatest.{ Matchers, WordSpecLike } +import kamon.metric._ import akka.io.Udp -import org.HdrHistogram.HdrRecorder -import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metric.Subscriptions.TickMetricSnapshot import java.lang.management.ManagementFactory import java.net.InetSocketAddress import com.typesafe.config.ConfigFactory @@ -32,13 +32,15 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher implicit lazy val system = ActorSystem("datadog-metric-sender-spec", ConfigFactory.parseString("kamon.datadog.max-packet-size = 256 bytes")) + val context = CollectionContext.default + "the DataDogMetricSender" should { "send latency measurements" in new UdpListenerFixture { val testMetricName = "processing-time" - val testRecorder = HdrRecorder(1000L, 2, Scale.Unit) + val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) testRecorder.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect())) + val udp = setup(Map(testMetricName -> testRecorder.collect(context))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"kamon.actor.processing-time:10|ms|#actor:user/kamon") @@ -46,11 +48,11 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher "include the sampling rate in case of multiple measurements of the same value" in new UdpListenerFixture { val testMetricName = "processing-time" - val testRecorder = HdrRecorder(1000L, 2, Scale.Unit) + val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) testRecorder.record(10L) testRecorder.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect())) + val udp = setup(Map(testMetricName -> testRecorder.collect(context))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"kamon.actor.processing-time:10|ms|@0.5|#actor:user/kamon") @@ -58,7 +60,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { val testMetricName = "processing-time" - val testRecorder = HdrRecorder(testMaxPacketSize, 3, Scale.Unit) + val testRecorder = Histogram(10000L, Precision.Normal, Scale.Unit) var bytes = 0 var level = 0 @@ -69,8 +71,8 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher bytes += s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon".length } - val udp = setup(Map(testMetricName -> testRecorder.collect())) - udp.expectMsgType[Udp.Send]// let the first flush pass + val udp = setup(Map(testMetricName -> testRecorder.collect(context))) + udp.expectMsgType[Udp.Send] // let the first flush pass val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon") @@ -81,24 +83,21 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher val secondTestMetricName = "processing-time-2" val thirdTestMetricName = "counter" - val firstTestRecorder = HdrRecorder(1000L, 2, Scale.Unit) - val secondTestRecorder = HdrRecorder(1000L, 2, Scale.Unit) - val thirdTestRecorder = CounterRecorder() + val firstTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) + val secondTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) + val thirdTestRecorder = Counter() firstTestRecorder.record(10L) firstTestRecorder.record(10L) secondTestRecorder.record(21L) - thirdTestRecorder.record(1L) - thirdTestRecorder.record(1L) - thirdTestRecorder.record(1L) - thirdTestRecorder.record(1L) + thirdTestRecorder.increment(4L) val udp = setup(Map( - firstTestMetricName -> firstTestRecorder.collect(), - secondTestMetricName -> secondTestRecorder.collect(), - thirdTestMetricName -> thirdTestRecorder.collect())) + firstTestMetricName -> firstTestRecorder.collect(context), + secondTestMetricName -> secondTestRecorder.collect(context), + thirdTestMetricName -> thirdTestRecorder.collect(context))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be("kamon.actor.processing-time-1:10|ms|@0.5|#actor:user/kamon\nkamon.actor.processing-time-2:21|ms|#actor:user/kamon\nkamon.actor.counter:4|c|#actor:user/kamon") @@ -109,7 +108,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) val testMaxPacketSize = system.settings.config.getBytes("kamon.datadog.max-packet-size") - def setup(metrics: Map[String, MetricSnapshotLike]): TestProbe = { + def setup(metrics: Map[String, MetricSnapshot]): TestProbe = { val udp = TestProbe() val metricsSender = system.actorOf(Props(new DatadogMetricsSender(new InetSocketAddress(localhostName, 0), testMaxPacketSize) { override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref @@ -137,7 +136,10 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher } metricsSender ! TickMetricSnapshot(0, 0, Map(testGroupIdentity -> new MetricGroupSnapshot { - val metrics: Map[MetricIdentity, MetricSnapshotLike] = testMetrics.toMap + type GroupSnapshotType = Histogram.Snapshot + def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType = ??? + + val metrics: Map[MetricIdentity, MetricSnapshot] = testMetrics.toMap })) udp } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala index 08e0add3..44d9c605 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala @@ -17,16 +17,16 @@ package kamon.newrelic import akka.actor.Actor -import kamon.metrics._ +import kamon.metric._ trait CustomMetrics { self: Actor ⇒ def collectCustomMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Seq[NewRelic.Metric] = { metrics.collect { - case (CustomMetric(name), groupSnapshot) ⇒ + case (UserMetrics, groupSnapshot) ⇒ groupSnapshot.metrics collect { - case (_, snapshot) ⇒ toNewRelicMetric(Scale.Unit)(s"Custom/$name", None, snapshot) + case (name, snapshot) ⇒ toNewRelicMetric(Scale.Unit)(s"Custom/$name", None, snapshot) } }.flatten.toSeq } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala index 46e22571..a3bb6311 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala @@ -17,7 +17,7 @@ package kamon.newrelic import akka.actor.{ Props, ActorRef, Actor } -import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metric.Subscriptions.TickMetricSnapshot import kamon.newrelic.MetricTranslator.TimeSliceMetrics class MetricTranslator(receiver: ActorRef) extends Actor diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala index c195ed12..85861454 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -18,8 +18,8 @@ package kamon.newrelic import akka.actor._ import scala.concurrent.duration._ import kamon.Kamon -import kamon.metrics.{ CustomMetric, TickMetricSnapshotBuffer, TraceMetrics, Metrics } -import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metric.{ UserMetrics, TickMetricSnapshotBuffer, TraceMetrics, Metrics } +import kamon.metric.Subscriptions.TickMetricSnapshot import akka.actor import java.util.concurrent.TimeUnit.MILLISECONDS @@ -30,7 +30,7 @@ class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension { val apdexT: Double = config.getDuration("apdexT", MILLISECONDS) / 1E3 // scale to seconds. Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(CustomMetric, "*", metricsListener, permanently = true) + //Kamon(Metrics)(system).subscribe(UserMetrics, "*", metricsListener, permanently = true) } class NewRelicMetricsListener extends Actor with ActorLogging { @@ -50,7 +50,7 @@ object NewRelic extends ExtensionId[NewRelicExtension] with ExtensionIdProvider def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system) case class Metric(name: String, scope: Option[String], callCount: Long, total: Double, totalExclusive: Double, - min: Double, max: Double, sumOfSquares: Double) { + min: Double, max: Double, sumOfSquares: Double) { def merge(that: Metric): Metric = { Metric(name, scope, diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala index 90f1e8a5..38517e10 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala @@ -16,8 +16,8 @@ package kamon.newrelic -import kamon.metrics._ -import kamon.metrics.TraceMetrics.ElapsedTime +import kamon.metric._ +import kamon.metric.TraceMetrics.ElapsedTime import akka.actor.Actor import kamon.Kamon @@ -27,6 +27,7 @@ trait WebTransactionMetrics { def collectWebTransactionMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Seq[NewRelic.Metric] = { val apdexBuilder = new ApdexBuilder("Apdex", None, Kamon(NewRelic)(context.system).apdexT) + /* // Trace metrics are recorded in nanoseconds. var accumulatedHttpDispatcher: MetricSnapshotLike = MetricSnapshot(InstrumentTypes.Histogram, 0, Scale.Nano, Vector.empty) @@ -46,7 +47,9 @@ trait WebTransactionMetrics { val httpDispatcher = toNewRelicMetric(Scale.Unit)("HttpDispatcher", None, accumulatedHttpDispatcher) val webTransaction = toNewRelicMetric(Scale.Unit)("WebTransaction", None, accumulatedHttpDispatcher) - Seq(httpDispatcher, webTransaction, apdexBuilder.build) ++ webTransactionMetrics.flatten.toSeq + Seq(httpDispatcher, webTransaction, apdexBuilder.build) ++ webTransactionMetrics.flatten.toSeq */ + + ??? } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala index f6e377c7..89a8b15b 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala @@ -16,26 +16,30 @@ package kamon -import kamon.metrics.{ Scale, MetricSnapshotLike } +import kamon.metric.instrument.{ Counter, Histogram } +import kamon.metric.{ MetricSnapshot, Scale } package object newrelic { - def toNewRelicMetric(scale: Scale)(name: String, scope: Option[String], snapshot: MetricSnapshotLike): NewRelic.Metric = { - var total: Double = 0D - var sumOfSquares: Double = 0D + def toNewRelicMetric(scale: Scale)(name: String, scope: Option[String], snapshot: MetricSnapshot): NewRelic.Metric = { + snapshot match { + case hs: Histogram.Snapshot ⇒ + var total: Double = 0D + var sumOfSquares: Double = 0D + val scaledMin = Scale.convert(hs.scale, scale, hs.min) + val scaledMax = Scale.convert(hs.scale, scale, hs.max) - val measurementLevels = snapshot.measurements.iterator - while (measurementLevels.hasNext) { - val level = measurementLevels.next() - val scaledValue = Scale.convert(snapshot.scale, scale, level.value) + hs.recordsIterator.foreach { record ⇒ + val scaledValue = Scale.convert(hs.scale, scale, record.level) - total += scaledValue * level.count - sumOfSquares += (scaledValue * scaledValue) * level.count - } + total += scaledValue * record.count + sumOfSquares += (scaledValue * scaledValue) * record.count + } - val scaledMin = Scale.convert(snapshot.scale, scale, snapshot.min) - val scaledMax = Scale.convert(snapshot.scale, scale, snapshot.max) + NewRelic.Metric(name, scope, hs.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares) - NewRelic.Metric(name, scope, snapshot.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares) + case cs: Counter.Snapshot ⇒ + NewRelic.Metric(name, scope, cs.count, cs.count, cs.count, 0, cs.count, cs.count * cs.count) + } } } diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala index 0951d2c9..2862ba19 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala @@ -19,7 +19,7 @@ package kamon.play.instrumentation import org.aspectj.lang.annotation.{ Around, Pointcut, Aspect } import org.aspectj.lang.ProceedingJoinPoint import kamon.trace.TraceRecorder -import kamon.metrics.TraceMetrics.HttpClientRequest +import kamon.metric.TraceMetrics.HttpClientRequest import play.api.libs.ws.WSRequest import scala.concurrent.Future import play.api.libs.ws.WSResponse @@ -36,7 +36,7 @@ class WSInstrumentation { def aroundExecuteRequest(pjp: ProceedingJoinPoint, request: WSRequest): Any = { import WSInstrumentation._ - val completionHandle = TraceRecorder.startSegment(HttpClientRequest(request.url, UserTime), basicRequestAttributes(request)) + val completionHandle = TraceRecorder.startSegment(HttpClientRequest(request.url), basicRequestAttributes(request)) val response = pjp.proceed().asInstanceOf[Future[WSResponse]] @@ -50,7 +50,6 @@ class WSInstrumentation { } object WSInstrumentation { - val UserTime = "UserTime" def basicRequestAttributes(request: WSRequest): Map[String, String] = { Map[String, String]( diff --git a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala index 0c3783bb..775d3e26 100644 --- a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala +++ b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala @@ -28,9 +28,9 @@ import akka.testkit.{ TestKitBase, TestProbe } import com.typesafe.config.ConfigFactory import org.scalatest.{ Matchers, WordSpecLike } import kamon.Kamon -import kamon.metrics.{ TraceMetrics, Metrics } -import kamon.metrics.Subscriptions.TickMetricSnapshot -import kamon.metrics.TraceMetrics.ElapsedTime +import kamon.metric.{ TraceMetrics, Metrics } +import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.TraceMetrics.ElapsedTime class WSInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with OneServerPerSuite { diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index 664bd4f9..84621927 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -26,9 +26,9 @@ import scala.util.Random import akka.routing.RoundRobinPool import kamon.trace.TraceRecorder import kamon.Kamon -import kamon.metrics._ +import kamon.metric._ import spray.http.{ StatusCodes, Uri } -import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metric.Subscriptions.TickMetricSnapshot object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with KamonTraceDirectives { import scala.concurrent.duration._ @@ -55,8 +55,6 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil val replier = system.actorOf(Props[Replier].withRouter(RoundRobinPool(nrOfInstances = 2)), "replier") val random = new Random() - val requestCountRecorder = Kamon(Metrics).register(CustomMetric("GetCount"), CustomMetric.histogram(10, 3, Scale.Unit)) - startServer(interface = "localhost", port = 9090) { get { path("test") { @@ -87,7 +85,6 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil path("ok") { traceName("OK") { complete { - requestCountRecorder.map(_.record(1)) "ok" } } diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala index d7d9cf09..d787bda4 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala @@ -21,7 +21,7 @@ import org.aspectj.lang.ProceedingJoinPoint import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest } import spray.http.HttpHeaders.{ RawHeader, Host } import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware } -import kamon.metrics.TraceMetrics.HttpClientRequest +import kamon.metric.TraceMetrics.HttpClientRequest import kamon.Kamon import kamon.spray.{ ClientSegmentCollectionStrategy, Spray } import akka.actor.ActorRef @@ -30,7 +30,6 @@ import akka.util.Timeout @Aspect class ClientRequestInstrumentation { - import ClientRequestInstrumentation._ @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") def mixin: SegmentCompletionHandleAware = SegmentCompletionHandleAware.default @@ -51,7 +50,7 @@ class ClientRequestInstrumentation { if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { val requestAttributes = basicRequestAttributes(request) val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, SprayTime), requestAttributes) + val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes) ctx.segmentCompletionHandle = Some(completionHandle) } @@ -102,7 +101,7 @@ class ClientRequestInstrumentation { if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) { val requestAttributes = basicRequestAttributes(request) val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, UserTime), requestAttributes) + val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes) responseFuture.onComplete { result ⇒ completionHandle.finish(Map.empty) @@ -139,8 +138,3 @@ class ClientRequestInstrumentation { pjp.proceed(Array(modifiedHeaders)) } } - -object ClientRequestInstrumentation { - val SprayTime = "SprayTime" - val UserTime = "UserTime" -} diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala index 9469924a..54329645 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala @@ -26,13 +26,12 @@ import com.typesafe.config.ConfigFactory import spray.can.Http import spray.http.HttpHeaders.RawHeader import kamon.Kamon -import kamon.metrics.{ TraceMetrics, Metrics } +import kamon.metric.{ TraceMetrics, Metrics } import spray.client.pipelining -import kamon.metrics.Subscriptions.TickMetricSnapshot -import spray.can.client.ClientRequestInstrumentation +import kamon.metric.Subscriptions.TickMetricSnapshot import scala.concurrent.duration._ import akka.pattern.pipe -import kamon.metrics.TraceMetrics.TraceMetricSnapshot +import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot } class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer { implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString( @@ -149,7 +148,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds) traceMetrics.elapsedTime.numberOfMeasurements should be(1L) traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.tag == ClientRequestInstrumentation.UserTime } map (_._2) + val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2) recordedSegment should not be empty recordedSegment map { segmentMetrics ⇒ segmentMetrics.numberOfMeasurements should be(1L) @@ -190,7 +189,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds) traceMetrics.elapsedTime.numberOfMeasurements should be(1L) traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.tag == ClientRequestInstrumentation.SprayTime } map (_._2) + val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2) recordedSegment should not be empty recordedSegment map { segmentMetrics ⇒ segmentMetrics.numberOfMeasurements should be(1L) @@ -199,14 +198,14 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit } } - def expectTraceMetrics(traceName: String, listener: TestProbe, timeout: FiniteDuration): TraceMetricSnapshot = { + def expectTraceMetrics(traceName: String, listener: TestProbe, timeout: FiniteDuration): TraceMetricsSnapshot = { val tickSnapshot = within(timeout) { listener.expectMsgType[TickMetricSnapshot] } val metricsOption = tickSnapshot.metrics.get(TraceMetrics(traceName)) metricsOption should not be empty - metricsOption.get.asInstanceOf[TraceMetricSnapshot] + metricsOption.get.asInstanceOf[TraceMetricsSnapshot] } def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal) diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala index 7edbbe11..ab9116fd 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala @@ -24,10 +24,11 @@ import kamon.Kamon import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } import spray.http.HttpHeaders.RawHeader import spray.http.{ HttpResponse, HttpRequest } -import kamon.metrics.{ TraceMetrics, Metrics } -import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metric.{ TraceMetrics, Metrics } +import kamon.metric.Subscriptions.TickMetricSnapshot import com.typesafe.config.ConfigFactory -import kamon.metrics.TraceMetrics.ElapsedTime +import kamon.metric.TraceMetrics.ElapsedTime +import kamon.metric.instrument.Histogram class ServerRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with ScalaFutures with PatienceConfiguration with TestServer { @@ -122,7 +123,7 @@ class ServerRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit traceMetrics should not be empty traceMetrics map { metrics ⇒ - metrics(ElapsedTime).numberOfMeasurements should be(1L) + metrics(ElapsedTime).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) } } diff --git a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala index 81242133..65506770 100644 --- a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala +++ b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala @@ -29,11 +29,9 @@ trait TestServer { def buildClientConnectionAndServer: (ActorRef, TestProbe) = { val serverHandler = TestProbe() IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref) - val bound = within(10 seconds) { - serverHandler.expectMsgType[Bound] - } - + val bound = serverHandler.expectMsgType[Bound](10 seconds) val client = clientConnection(bound) + serverHandler.expectMsgType[Http.Connected] serverHandler.reply(Http.Register(serverHandler.ref)) @@ -50,10 +48,7 @@ trait TestServer { def buildSHostConnectorAndServer: (ActorRef, TestProbe) = { val serverHandler = TestProbe() IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref) - val bound = within(10 seconds) { - serverHandler.expectMsgType[Bound] - } - + val bound = serverHandler.expectMsgType[Bound](10 seconds) val client = httpHostConnector(bound) (client, serverHandler) diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 1b3daa97..dcd78f78 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -18,7 +18,7 @@ package kamon.statsd import akka.actor._ import kamon.Kamon -import kamon.metrics._ +import kamon.metric._ import scala.concurrent.duration._ import scala.collection.JavaConverters._ import com.typesafe.config.Config diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index adda18cc..94bab27c 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -20,12 +20,12 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor } import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString -import kamon.metrics.Subscriptions.TickMetricSnapshot -import kamon.metrics.MetricSnapshot.Measurement -import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } +import kamon.metric.Subscriptions.TickMetricSnapshot import java.text.{ DecimalFormatSymbols, DecimalFormat } import java.util.Locale +import kamon.metric.instrument.{ Counter, Histogram } + class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider { import context.system @@ -48,7 +48,7 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) } def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) + val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) for ( (groupIdentity, groupSnapshot) ← tick.metrics; @@ -57,25 +57,26 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) val key = metricKeyGenerator.generateKey(groupIdentity, metricIdentity) - for (measurement ← metricSnapshot.measurements) { - val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType) - dataBuilder.appendMeasurement(key, measurementData) + metricSnapshot match { + case hs: Histogram.Snapshot ⇒ + hs.recordsIterator.foreach { record ⇒ + packetBuilder.appendMeasurement(key, encodeStatsDTimer(record.level, record.count)) + } + + case cs: Counter.Snapshot ⇒ + packetBuilder.appendMeasurement(key, encodeStatsDCounter(cs.count)) } } - dataBuilder.flush() + packetBuilder.flush() } - def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): String = { - def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String = - value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") - - instrumentType match { - case Histogram ⇒ statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) - case Gauge ⇒ statsDMetricFormat(measurement.value.toString, "g") - case Counter ⇒ statsDMetricFormat(measurement.count.toString, "c") - } + def encodeStatsDTimer(level: Long, count: Long): String = { + val samplingRate: Double = 1D / count + level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") } + + def encodeStatsDCounter(count: Long): String = count.toString + "|c" } object StatsDMetricsSender { diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala index 9dfd05f7..19d8a80b 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala @@ -18,11 +18,12 @@ package kamon.statsd import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.{ ActorRef, Props, ActorSystem } +import kamon.metric.instrument.Histogram.Precision +import kamon.metric.instrument.{ Histogram, HdrHistogram } import org.scalatest.{ Matchers, WordSpecLike } -import kamon.metrics._ +import kamon.metric._ import akka.io.Udp -import org.HdrHistogram.HdrRecorder -import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metric.Subscriptions.TickMetricSnapshot import java.lang.management.ManagementFactory import java.net.InetSocketAddress import com.typesafe.config.ConfigFactory @@ -31,14 +32,16 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers implicit lazy val system = ActorSystem("statsd-metric-sender-spec", ConfigFactory.parseString("kamon.statsd.max-packet-size = 256 bytes")) + val context = CollectionContext.default + "the StatsDMetricSender" should { "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture { val testMetricName = "test-metric" val testMetricKey = buildMetricKey(testMetricName) - val testRecorder = HdrRecorder(1000L, 2, Scale.Unit) + val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) testRecorder.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect())) + val udp = setup(Map(testMetricName -> testRecorder.collect(context))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$testMetricKey:10|ms") @@ -47,12 +50,12 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers "render several measurements of the same key under a single (key + multiple measurements) packet" in new UdpListenerFixture { val testMetricName = "test-metric" val testMetricKey = buildMetricKey(testMetricName) - val testRecorder = HdrRecorder(1000L, 2, Scale.Unit) + val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) testRecorder.record(10L) testRecorder.record(11L) testRecorder.record(12L) - val udp = setup(Map(testMetricName -> testRecorder.collect())) + val udp = setup(Map(testMetricName -> testRecorder.collect(context))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$testMetricKey:10|ms:11|ms:12|ms") @@ -61,11 +64,11 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new UdpListenerFixture { val testMetricName = "test-metric" val testMetricKey = buildMetricKey(testMetricName) - val testRecorder = HdrRecorder(1000L, 2, Scale.Unit) + val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) testRecorder.record(10L) testRecorder.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect())) + val udp = setup(Map(testMetricName -> testRecorder.collect(context))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$testMetricKey:10|ms|@0.5") @@ -74,7 +77,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { val testMetricName = "test-metric" val testMetricKey = buildMetricKey(testMetricName) - val testRecorder = HdrRecorder(testMaxPacketSize, 3, Scale.Unit) + val testRecorder = Histogram(10000L, Precision.Normal, Scale.Unit) var bytes = testMetricKey.length var level = 0 @@ -84,7 +87,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers bytes += s":$level|ms".length } - val udp = setup(Map(testMetricName -> testRecorder.collect())) + val udp = setup(Map(testMetricName -> testRecorder.collect(context))) udp.expectMsgType[Udp.Send] // let the first flush pass val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] @@ -97,8 +100,8 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers val secondTestMetricName = "second-test-metric" val secondTestMetricKey = buildMetricKey(secondTestMetricName) - val firstTestRecorder = HdrRecorder(1000L, 2, Scale.Unit) - val secondTestRecorder = HdrRecorder(1000L, 2, Scale.Unit) + val firstTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) + val secondTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) firstTestRecorder.record(10L) firstTestRecorder.record(10L) @@ -108,8 +111,8 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers secondTestRecorder.record(21L) val udp = setup(Map( - firstTestMetricName -> firstTestRecorder.collect(), - secondTestMetricName -> secondTestRecorder.collect())) + firstTestMetricName -> firstTestRecorder.collect(context), + secondTestMetricName -> secondTestRecorder.collect(context))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$firstTestMetricKey:10|ms|@0.5:11|ms\n$secondTestMetricKey:20|ms:21|ms") @@ -122,7 +125,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers def buildMetricKey(metricName: String): String = s"kamon.$localhostName.test-metric-category.test-group.$metricName" - def setup(metrics: Map[String, MetricSnapshotLike]): TestProbe = { + def setup(metrics: Map[String, MetricSnapshot]): TestProbe = { val udp = TestProbe() val metricsSender = system.actorOf(Props(new StatsDMetricsSender(new InetSocketAddress(localhostName, 0), testMaxPacketSize) { override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref @@ -149,7 +152,10 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers } metricsSender ! TickMetricSnapshot(0, 0, Map(testGroupIdentity -> new MetricGroupSnapshot { - val metrics: Map[MetricIdentity, MetricSnapshotLike] = testMetrics.toMap + type GroupSnapshotType = Histogram.Snapshot + def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType = ??? + + val metrics: Map[MetricIdentity, MetricSnapshot] = testMetrics.toMap })) udp diff --git a/project/Settings.scala b/project/Settings.scala index 9abd5553..f2ccc32c 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -33,7 +33,7 @@ object Settings { def formattingPreferences = FormattingPreferences() .setPreference(RewriteArrowSymbols, true) - .setPreference(AlignParameters, true) + .setPreference(AlignParameters, false) .setPreference(AlignSingleLineCaseStatements, true) .setPreference(DoubleIndentClassDeclaration, true) } \ No newline at end of file -- cgit v1.2.3