From 485abe569d23bccf2d263c82b43e59464dc7e834 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 12 Jan 2015 01:45:27 +0100 Subject: ! all: improve the metric recorders infrastructure --- kamon-core/src/main/resources/META-INF/aop.xml | 8 +- kamon-core/src/main/resources/reference.conf | 120 +++++---- .../src/main/scala/kamon/AkkaExtensionSwap.scala | 30 --- kamon-core/src/main/scala/kamon/Kamon.scala | 19 +- .../src/main/scala/kamon/ModuleSupervisor.scala | 48 ++++ kamon-core/src/main/scala/kamon/TimeUnits.scala | 77 ------ .../main/scala/kamon/http/HttpServerMetrics.scala | 114 ++------ .../AtomicHistogramFieldsAccessor.scala | 35 --- .../scala/FutureInstrumentation.scala | 48 ---- .../scalaz/FutureInstrumentation.scala | 47 ---- .../src/main/scala/kamon/metric/Entity.scala | 52 ++++ .../main/scala/kamon/metric/EntityMetrics.scala | 75 ------ .../main/scala/kamon/metric/EntityRecorder.scala | 157 +++++++++++ .../main/scala/kamon/metric/EntitySnapshot.scala | 47 ++++ .../src/main/scala/kamon/metric/MetricKey.scala | 153 +++++++++++ .../main/scala/kamon/metric/MetricsExtension.scala | 166 ++++++------ .../kamon/metric/MetricsExtensionSettings.scala | 100 +++++++ kamon-core/src/main/scala/kamon/metric/Scale.scala | 31 --- .../main/scala/kamon/metric/Subscriptions.scala | 173 ------------ .../kamon/metric/SubscriptionsDispatcher.scala | 115 ++++++++ .../kamon/metric/TickMetricSnapshotBuffer.scala | 49 ++++ .../src/main/scala/kamon/metric/TraceMetrics.scala | 74 ++---- .../src/main/scala/kamon/metric/UserMetrics.scala | 278 +++++++++---------- .../instrument/AtomicHistogramFieldsAccessor.scala | 35 +++ .../scala/kamon/metric/instrument/Counter.scala | 15 +- .../main/scala/kamon/metric/instrument/Gauge.scala | 108 +++++--- .../scala/kamon/metric/instrument/Histogram.scala | 164 ++++++------ .../scala/kamon/metric/instrument/Instrument.scala | 56 ++++ .../metric/instrument/InstrumentFactory.scala | 35 +++ .../metric/instrument/InstrumentSettings.scala | 67 +++++ .../kamon/metric/instrument/MinMaxCounter.scala | 29 +- .../metric/instrument/UnitOfMeasurement.scala | 55 ++++ .../src/main/scala/kamon/metric/package.scala | 34 --- .../scala/kamon/standalone/KamonStandalone.scala | 61 ----- .../src/main/scala/kamon/trace/Incubator.scala | 2 +- .../scala/kamon/trace/MetricsOnlyContext.scala | 37 +-- .../src/main/scala/kamon/trace/Sampler.scala | 3 +- .../src/main/scala/kamon/trace/TraceContext.scala | 66 +++-- .../main/scala/kamon/trace/TraceExtension.scala | 91 ------- .../src/main/scala/kamon/trace/TraceLocal.scala | 4 +- .../src/main/scala/kamon/trace/TraceRecorder.scala | 79 ------ .../main/scala/kamon/trace/TracerExtension.scala | 94 +++++++ .../kamon/trace/TracerExtensionSettings.scala | 30 +++ .../main/scala/kamon/trace/TracingContext.scala | 13 +- .../trace/logging/LogbackTraceTokenConverter.scala | 4 +- .../scala/kamon/trace/logging/MdcKeysSupport.scala | 4 +- .../src/main/scala/kamon/util/ConfigTools.scala | 26 ++ .../src/main/scala/kamon/util/FastDispatch.scala | 22 ++ .../src/main/scala/kamon/util/MapMerge.scala | 27 ++ .../src/main/scala/kamon/util/Timestamp.scala | 85 ++++++ .../kamon/util/TriemapAtomicGetOrElseUpdate.scala | 18 ++ kamon-core/src/test/resources/logback.xml | 24 +- .../scala/FutureInstrumentationSpec.scala | 63 ----- .../scalaz/FutureInstrumentationSpec.scala | 63 ----- .../kamon/metric/SubscriptionsProtocolSpec.scala | 112 ++++---- .../metric/TickMetricSnapshotBufferSpec.scala | 65 +++-- .../test/scala/kamon/metric/TraceMetricsSpec.scala | 107 ++++---- .../test/scala/kamon/metric/UserMetricsSpec.scala | 296 ++++----------------- .../kamon/metric/instrument/CounterSpec.scala | 1 - .../scala/kamon/metric/instrument/GaugeSpec.scala | 66 ++--- .../kamon/metric/instrument/HistogramSpec.scala | 34 +-- .../metric/instrument/MinMaxCounterSpec.scala | 22 +- .../test/scala/kamon/testkit/BaseKamonSpec.scala | 34 +++ .../test/scala/kamon/trace/SimpleTraceSpec.scala | 76 ++---- .../kamon/trace/TraceContextManipulationSpec.scala | 94 +++---- .../test/scala/kamon/trace/TraceLocalSpec.scala | 23 +- .../test/scala/kamon/util/GlobPathFilterSpec.scala | 9 +- .../src/main/scala/testkit/AkkaExtensionSwap.scala | 31 +++ .../scala/testkit/TestProbeInstrumentation.scala | 4 +- 69 files changed, 2265 insertions(+), 2139 deletions(-) delete mode 100644 kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala create mode 100644 kamon-core/src/main/scala/kamon/ModuleSupervisor.scala delete mode 100644 kamon-core/src/main/scala/kamon/TimeUnits.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/Entity.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/MetricKey.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/Scale.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/Subscriptions.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/package.scala delete mode 100644 kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala delete mode 100644 kamon-core/src/main/scala/kamon/trace/TraceExtension.scala delete mode 100644 kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/TracerExtension.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala create mode 100644 kamon-core/src/main/scala/kamon/util/ConfigTools.scala create mode 100644 kamon-core/src/main/scala/kamon/util/FastDispatch.scala create mode 100644 kamon-core/src/main/scala/kamon/util/MapMerge.scala create mode 100644 kamon-core/src/main/scala/kamon/util/Timestamp.scala create mode 100644 kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala create mode 100644 kamon-testkit/src/main/scala/testkit/AkkaExtensionSwap.scala diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 47ce11d8..854e9437 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -2,19 +2,13 @@ + - - - - - - - diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 8f5a8b45..cd257ebe 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -3,19 +3,11 @@ # ================================== # kamon { - - # Default dispatcher for all Kamon components, unless a more specific one is configured. - default-dispatcher = "kamon.kamon-dispatcher" - - metrics { + metric { # Time interval for collecting all metrics and send the snapshots to all subscribed actors. tick-interval = 1 second - # Time interval for recording values on all registered gauges. - gauge-recording-interval = 100 milliseconds - - # Default size for the LongBuffer that gets allocated for metrics collection and merge. The # value should correspond to the highest number of different buckets with values that might # exist in a single histogram during a metrics collection. The default value of 33792 is a @@ -31,69 +23,79 @@ kamon { # it might be ok for you to turn this error off. disable-aspectj-weaver-missing-error = false + # Specify if entities that do not match any include/exclude filter should be tracked. + track-unmatched-entities = yes - dispatchers { - - # Dispatcher for periodical gauge value recordings. - gauge-recordings = ${kamon.default-dispatcher} - - # Dispatcher for subscriptions and metrics collection actors. - metric-subscriptions = ${kamon.default-dispatcher} - } - - - filters = [ - { - actor { - includes = [] - excludes = [ "system/*", "user/IO-*" ] - } - }, - { - router { - includes = [] - excludes = [ "system/*", "user/IO-*" ] - } - }, - { - trace { - includes = [ "*" ] - excludes = [] - } - }, - { - dispatcher { - includes = [ "default-dispatcher" ] - excludes = [] - } + filters { + trace { + includes = [ "**" ] + excludes = [ ] } - ] + } - precision { - default-histogram-precision { + # Default instrument settings for histograms, min max counters and gaugues. The actual settings to be used when + # creating a instrument is determined by merging the default settings, code settings and specific instrument + # settings using the following priorities (top wins): + + # - any setting in `kamon.metric.instrument-settings` for the given category/instrument. + # - code settings provided when creating the instrument. + # - `default-instrument-settings`. + # + default-instrument-settings { + histogram { + precision = normal + lowest-discernible-value = 1 highest-trackable-value = 3600000000000 - significant-value-digits = 2 } - default-min-max-counter-precision { - refresh-interval = 100 milliseconds + min-max-counter { + precision = normal + lowest-discernible-value = 1 highest-trackable-value = 999999999 - significant-value-digits = 2 + refresh-interval = 100 milliseconds } - default-gauge-precision { + gauge { + precision = normal + lowest-discernible-value = 1 + highest-trackable-value = 3600000000000 refresh-interval = 100 milliseconds - highest-trackable-value = 999999999 - significant-value-digits = 2 } - trace { - elapsed-time = ${kamon.metrics.precision.default-histogram-precision} - segment = ${kamon.metrics.precision.default-histogram-precision} - } + } + + # Custom configurations for category instruments. The settings provided in this section will override the default + # and code instrument settings as explained in the `default-instrument-settings` key. There is no need to provide + # full instrument settings in this section, only the settings that should be overriden must be included. Example: + # if you wish to change the precision and lowest discernible value of the `elapsed-time` instrument for the `trace` + # category, you should include the following configuration in your application.conf file: + # + # kamon.metric.instrument-settings.trace { + # elapsed-time { + # precision = fine + # lowest-discernible-value = 1000 + # } + # } + # + # In this example, the value for the `highest-trackable-value` setting will be either the code setting or the default + # setting, depending on how the `elapsed-time` metric is created. + instrument-settings { + + } + + dispatchers { + + # Dispatcher for the actor that will collect all recorded metrics on every tick and dispatch them to all subscribers. + metric-collection = akka.actor.default-dispatcher + + # Dispatcher for the Kamon refresh scheduler, used by all MinMaxCounters and Gaugues to update their values. + refresh-scheduler = akka.actor.default-dispatcher } } + + + trace { # Level of detail used when recording trace information. The posible values are: @@ -101,7 +103,7 @@ kamon { # to the subscriptors of trace data. # - simple-trace: metrics for all included traces and all segments are recorded and additionally a Trace message # containing the trace and segments details and metadata. - level = metrics-only + level-of-detail = metrics-only # Sampling strategy to apply when the tracing level is set to `simple-trace`. The options are: all, random, ordered # and threshold. The details of each sampler are bellow. @@ -142,7 +144,7 @@ kamon { } # Default dispatcher for all trace module operations - dispatcher = ${kamon.default-dispatcher} + dispatcher = "akka.actor.default-dispatcher" } kamon-dispatcher { diff --git a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala deleted file mode 100644 index b7050c59..00000000 --- a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon - -import akka.actor.{ Extension, ActorSystem, ExtensionId } -import java.util.concurrent.ConcurrentHashMap - -object AkkaExtensionSwap { - def swap(system: ActorSystem, key: ExtensionId[_], value: Extension): Unit = { - val extensionsField = system.getClass.getDeclaredField("extensions") - extensionsField.setAccessible(true) - - val extensions = extensionsField.get(system).asInstanceOf[ConcurrentHashMap[ExtensionId[_], AnyRef]] - extensions.put(key, value) - } -} diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 00026b77..f07f846b 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -16,9 +16,26 @@ package kamon import _root_.akka.actor import _root_.akka.actor._ +import com.typesafe.config.Config +import kamon.metric._ +import kamon.trace.{ Tracer, TracerExtension } + +class Kamon(val actorSystem: ActorSystem) { + val metrics: MetricsExtension = Metrics.get(actorSystem) + val tracer: TracerExtension = Tracer.get(actorSystem) + val userMetrics: UserMetricsExtension = UserMetrics.get(actorSystem) +} object Kamon { trait Extension extends actor.Extension def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): T = key(system) -} + def apply(actorSystemName: String): Kamon = + apply(ActorSystem(actorSystemName)) + + def apply(actorSystemName: String, config: Config): Kamon = + apply(ActorSystem(actorSystemName, config)) + + def apply(system: ActorSystem): Kamon = + new Kamon(system) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/ModuleSupervisor.scala b/kamon-core/src/main/scala/kamon/ModuleSupervisor.scala new file mode 100644 index 00000000..99d87719 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/ModuleSupervisor.scala @@ -0,0 +1,48 @@ +package kamon + +import _root_.akka.actor +import _root_.akka.actor._ +import kamon.ModuleSupervisor.CreateModule + +import scala.concurrent.{ Future, Promise } +import scala.util.Success + +object ModuleSupervisor extends ExtensionId[ModuleSupervisorExtension] with ExtensionIdProvider { + + def lookup(): ExtensionId[_ <: actor.Extension] = ModuleSupervisor + def createExtension(system: ExtendedActorSystem): ModuleSupervisorExtension = new ModuleSupervisorExtensionImpl(system) + + case class CreateModule(name: String, props: Props, childPromise: Promise[ActorRef]) +} + +trait ModuleSupervisorExtension extends actor.Extension { + def createModule(name: String, props: Props): Future[ActorRef] +} + +class ModuleSupervisorExtensionImpl(system: ExtendedActorSystem) extends ModuleSupervisorExtension { + import system.dispatcher + private lazy val supervisor = system.actorOf(Props[ModuleSupervisor], "kamon") + + def createModule(name: String, props: Props): Future[ActorRef] = Future {} flatMap { _: Unit ⇒ + val modulePromise = Promise[ActorRef]() + supervisor ! CreateModule(name, props, modulePromise) + modulePromise.future + } +} + +class ModuleSupervisor extends Actor with ActorLogging { + + def receive = { + case CreateModule(name, props, childPromise) ⇒ createChildModule(name, props, childPromise) + } + + def createChildModule(name: String, props: Props, childPromise: Promise[ActorRef]): Unit = { + context.child(name).map { alreadyAvailableModule ⇒ + log.warning("Received a request to create module [{}] but the module is already available, returning the existent one.") + childPromise.complete(Success(alreadyAvailableModule)) + + } getOrElse { + childPromise.complete(Success(context.actorOf(props, name))) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/TimeUnits.scala b/kamon-core/src/main/scala/kamon/TimeUnits.scala deleted file mode 100644 index f2933a11..00000000 --- a/kamon-core/src/main/scala/kamon/TimeUnits.scala +++ /dev/null @@ -1,77 +0,0 @@ -package kamon - -/** - * Epoch time stamp in seconds. - */ -class Timestamp(val seconds: Long) extends AnyVal { - def <(that: Timestamp): Boolean = this.seconds < that.seconds - def >(that: Timestamp): Boolean = this.seconds > that.seconds - def ==(that: Timestamp): Boolean = this.seconds == that.seconds - def >=(that: Timestamp): Boolean = this.seconds >= that.seconds - def <=(that: Timestamp): Boolean = this.seconds <= that.seconds - - override def toString: String = String.valueOf(seconds) + ".seconds" -} - -object Timestamp { - def now: Timestamp = new Timestamp(System.currentTimeMillis() / 1000) - def earlier(l: Timestamp, r: Timestamp): Timestamp = if (l <= r) l else r - def later(l: Timestamp, r: Timestamp): Timestamp = if (l >= r) l else r -} - -/** - * Epoch time stamp in milliseconds. - */ -class MilliTimestamp(val millis: Long) extends AnyVal { - override def toString: String = String.valueOf(millis) + ".millis" - def toTimestamp: Timestamp = new Timestamp(millis / 1000) -} - -object MilliTimestamp { - def now: MilliTimestamp = new MilliTimestamp(System.currentTimeMillis()) -} - -/** - * Epoch time stamp in nanoseconds. - * - * NOTE: This doesn't have any better precision than MilliTimestamp, it is just a convenient way to get a epoch - * timestamp in nanoseconds. - */ -class NanoTimestamp(val nanos: Long) extends AnyVal { - override def toString: String = String.valueOf(nanos) + ".nanos" -} - -object NanoTimestamp { - def now: NanoTimestamp = new NanoTimestamp(System.currentTimeMillis() * 1000000) -} - -/** - * Number of nanoseconds between a arbitrary origin timestamp provided by the JVM via System.nanoTime() - */ -class RelativeNanoTimestamp(val nanos: Long) extends AnyVal { - override def toString: String = String.valueOf(nanos) + ".nanos" -} - -object RelativeNanoTimestamp { - def now: RelativeNanoTimestamp = new RelativeNanoTimestamp(System.nanoTime()) - def relativeTo(milliTimestamp: MilliTimestamp): RelativeNanoTimestamp = - new RelativeNanoTimestamp(now.nanos - (MilliTimestamp.now.millis - milliTimestamp.millis) * 1000000) -} - -/** - * Number of nanoseconds that passed between two points in time. - */ -class NanoInterval(val nanos: Long) extends AnyVal { - def <(that: NanoInterval): Boolean = this.nanos < that.nanos - def >(that: NanoInterval): Boolean = this.nanos > that.nanos - def ==(that: NanoInterval): Boolean = this.nanos == that.nanos - def >=(that: NanoInterval): Boolean = this.nanos >= that.nanos - def <=(that: NanoInterval): Boolean = this.nanos <= that.nanos - - override def toString: String = String.valueOf(nanos) + ".nanos" -} - -object NanoInterval { - def default: NanoInterval = new NanoInterval(0L) - def since(relative: RelativeNanoTimestamp): NanoInterval = new NanoInterval(System.nanoTime() - relative.nanos) -} diff --git a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala index 0dd189f6..22f54ab0 100644 --- a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala +++ b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala @@ -1,99 +1,25 @@ package kamon.http -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric.instrument.Counter -import kamon.metric._ - -import scala.collection.concurrent.TrieMap - -object HttpServerMetrics extends MetricGroupIdentity { - import Metrics.AtomicGetOrElseUpdateForTriemap - - val name: String = "http-server-metrics-recorder" - val category = new MetricGroupCategory { - val name: String = "http-server" - } - - type TraceName = String - type StatusCode = String - - case class CountPerStatusCode(statusCode: String) extends MetricIdentity { - def name: String = statusCode - } - - case class TraceCountPerStatus(traceName: TraceName, statusCode: StatusCode) extends MetricIdentity { - def name: String = traceName + "_" + statusCode - } - - class HttpServerMetricsRecorder extends MetricGroupRecorder { - - private val counters = TrieMap[StatusCode, Counter]() - private val countersPerTrace = TrieMap[TraceName, TrieMap[StatusCode, Counter]]() - - def recordResponse(statusCode: StatusCode): Unit = recordResponse(statusCode, 1L) - - def recordResponse(statusCode: StatusCode, count: Long): Unit = - counters.atomicGetOrElseUpdate(statusCode, Counter()).increment(count) - - def recordResponse(traceName: TraceName, statusCode: StatusCode): Unit = recordResponse(traceName, statusCode, 1L) - - def recordResponse(traceName: TraceName, statusCode: StatusCode, count: Long): Unit = { - recordResponse(statusCode, count) - countersPerTrace.atomicGetOrElseUpdate(traceName, TrieMap()).atomicGetOrElseUpdate(statusCode, Counter()).increment(count) - } - - def collect(context: CollectionContext): HttpServerMetricsSnapshot = { - val countsPerStatusCode = counters.map { - case (statusCode, counter) ⇒ (statusCode, counter.collect(context)) - }.toMap - - val countsPerTraceAndStatus = countersPerTrace.map { - case (traceName, countsPerStatus) ⇒ - (traceName, countsPerStatus.map { case (statusCode, counter) ⇒ (statusCode, counter.collect(context)) }.toMap) - }.toMap - - HttpServerMetricsSnapshot(countsPerStatusCode, countsPerTraceAndStatus) - } - - def cleanup: Unit = {} +import kamon.metric.{ EntityRecorderFactory, GenericEntityRecorder } +import kamon.metric.instrument.InstrumentFactory + +/** + * Counts HTTP response status codes into per status code and per trace name + status counters. If recording a HTTP + * response with status 500 for the trace "GetUser", the counter with name "500" as well as the counter with name + * "GetUser_500" will be incremented. + */ +class HttpServerMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + + def recordResponse(statusCode: String): Unit = + counter(statusCode).increment() + + def recordResponse(traceName: String, statusCode: String): Unit = { + recordResponse(statusCode) + counter(traceName + "_" + statusCode).increment() } - - case class HttpServerMetricsSnapshot(countsPerStatusCode: Map[StatusCode, Counter.Snapshot], - countsPerTraceAndStatusCode: Map[TraceName, Map[StatusCode, Counter.Snapshot]]) extends MetricGroupSnapshot { - - type GroupSnapshotType = HttpServerMetricsSnapshot - - def merge(that: HttpServerMetricsSnapshot, context: CollectionContext): HttpServerMetricsSnapshot = { - val combinedCountsPerStatus = combineMaps(countsPerStatusCode, that.countsPerStatusCode)((l, r) ⇒ l.merge(r, context)) - val combinedCountsPerTraceAndStatus = combineMaps(countsPerTraceAndStatusCode, that.countsPerTraceAndStatusCode) { - (leftCounts, rightCounts) ⇒ combineMaps(leftCounts, rightCounts)((l, r) ⇒ l.merge(r, context)) - } - HttpServerMetricsSnapshot(combinedCountsPerStatus, combinedCountsPerTraceAndStatus) - } - - def metrics: Map[MetricIdentity, MetricSnapshot] = { - countsPerStatusCode.map { - case (statusCode, count) ⇒ (CountPerStatusCode(statusCode), count) - } ++ { - for ( - (traceName, countsPerStatus) ← countsPerTraceAndStatusCode; - (statusCode, count) ← countsPerStatus - ) yield (TraceCountPerStatus(traceName, statusCode), count) - } - } - } - - val Factory = HttpServerMetricGroupFactory } -case object HttpServerMetricGroupFactory extends MetricGroupFactory { - - import HttpServerMetrics._ - - type GroupRecorder = HttpServerMetricsRecorder - - def create(config: Config, system: ActorSystem): HttpServerMetricsRecorder = - new HttpServerMetricsRecorder() - -} \ No newline at end of file +object HttpServerMetrics extends EntityRecorderFactory[HttpServerMetrics] { + def category: String = "http-server" + def createRecorder(instrumentFactory: InstrumentFactory): HttpServerMetrics = new HttpServerMetrics(instrumentFactory) +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala b/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala deleted file mode 100644 index e79090a8..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package org.HdrHistogram - -import java.util.concurrent.atomic.{ AtomicLongArray, AtomicLongFieldUpdater } - -trait AtomicHistogramFieldsAccessor { - self: AtomicHistogram ⇒ - - def countsArray(): AtomicLongArray = self.counts - - def unitMagnitude(): Int = self.unitMagnitude - - def subBucketHalfCount(): Int = self.subBucketHalfCount - - def subBucketHalfCountMagnitude(): Int = self.subBucketHalfCountMagnitude -} - -object AtomicHistogramFieldsAccessor { - def totalCountUpdater(): AtomicLongFieldUpdater[AtomicHistogram] = AtomicHistogram.totalCountUpdater -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala deleted file mode 100644 index bda2da78..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.instrumentation.scala - -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class FutureInstrumentation { - - @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") - def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default - - @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)") - def futureRelatedRunnableCreation(runnable: TraceContextAware): Unit = {} - - @After("futureRelatedRunnableCreation(runnable)") - def afterCreation(runnable: TraceContextAware): Unit = { - // Force traceContext initialization. - runnable.traceContext - } - - @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)") - def futureRelatedRunnableExecution(runnable: TraceContextAware) = {} - - @Around("futureRelatedRunnableExecution(runnable)") - def aroundExecution(pjp: ProceedingJoinPoint, runnable: TraceContextAware): Any = { - TraceRecorder.withInlineTraceContextReplacement(runnable.traceContext) { - pjp.proceed() - } - } - -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala deleted file mode 100644 index 65caaa8f..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.instrumentation.scalaz - -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class FutureInstrumentation { - - @DeclareMixin("scalaz.concurrent..* && java.util.concurrent.Callable+") - def mixinTraceContextAwareToFutureRelatedCallable: TraceContextAware = - TraceContextAware.default - - @Pointcut("execution((scalaz.concurrent..* && java.util.concurrent.Callable+).new(..)) && this(callable)") - def futureRelatedCallableCreation(callable: TraceContextAware): Unit = {} - - @After("futureRelatedCallableCreation(callable)") - def afterCreation(callable: TraceContextAware): Unit = - // Force traceContext initialization. - callable.traceContext - - @Pointcut("execution(* (scalaz.concurrent..* && java.util.concurrent.Callable+).call()) && this(callable)") - def futureRelatedCallableExecution(callable: TraceContextAware): Unit = {} - - @Around("futureRelatedCallableExecution(callable)") - def aroundExecution(pjp: ProceedingJoinPoint, callable: TraceContextAware): Any = - TraceRecorder.withInlineTraceContextReplacement(callable.traceContext) { - pjp.proceed() - } - -} diff --git a/kamon-core/src/main/scala/kamon/metric/Entity.scala b/kamon-core/src/main/scala/kamon/metric/Entity.scala new file mode 100644 index 00000000..962626e0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Entity.scala @@ -0,0 +1,52 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +/** + * Identify a `thing` that is being monitored by Kamon. A [[kamon.metric.Entity]] is used to identify tracked `things` + * in both the metrics recording and reporting sides. Only the name and category fields are used with determining + * equality between two entities. + * + * // TODO: Find a better word for `thing`. + */ +class Entity(val name: String, val category: String, val metadata: Map[String, String]) { + + override def equals(o: Any): Boolean = { + if (this eq o.asInstanceOf[AnyRef]) + true + else if ((o.asInstanceOf[AnyRef] eq null) || !o.isInstanceOf[Entity]) + false + else { + val thatAsEntity = o.asInstanceOf[Entity] + category == thatAsEntity.category && name == thatAsEntity.name + } + } + + override def hashCode: Int = { + var result: Int = name.hashCode + result = 31 * result + category.hashCode + return result + } +} + +object Entity { + def apply(name: String, category: String): Entity = + apply(name, category, Map.empty) + + def apply(name: String, category: String, metadata: Map[String, String]): Entity = + new Entity(name, category, metadata) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala deleted file mode 100644 index 3761f5a5..00000000 --- a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import java.nio.{ LongBuffer } -import akka.actor.ActorSystem -import com.typesafe.config.Config - -trait MetricGroupCategory { - def name: String -} - -trait MetricGroupIdentity { - def name: String - def category: MetricGroupCategory -} - -trait MetricIdentity { - def name: String -} - -trait CollectionContext { - def buffer: LongBuffer -} - -object CollectionContext { - def apply(longBufferSize: Int): CollectionContext = new CollectionContext { - val buffer: LongBuffer = LongBuffer.allocate(longBufferSize) - } -} - -trait MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot - def cleanup: Unit -} - -trait MetricSnapshot { - type SnapshotType - - def merge(that: SnapshotType, context: CollectionContext): SnapshotType -} - -trait MetricGroupSnapshot { - type GroupSnapshotType - - def metrics: Map[MetricIdentity, MetricSnapshot] - def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType -} - -private[kamon] trait MetricRecorder { - type SnapshotType <: MetricSnapshot - - def collect(context: CollectionContext): SnapshotType - def cleanup: Unit -} - -trait MetricGroupFactory { - type GroupRecorder <: MetricGroupRecorder - def create(config: Config, system: ActorSystem): GroupRecorder -} - diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala new file mode 100644 index 00000000..7a1972f0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala @@ -0,0 +1,157 @@ +package kamon.metric + +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange +import kamon.metric.instrument._ + +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.FiniteDuration + +trait EntityRecorder { + def collect(collectionContext: CollectionContext): EntitySnapshot + def cleanup: Unit +} + +trait EntityRecorderFactory[T <: EntityRecorder] { + def category: String + def createRecorder(instrumentFactory: InstrumentFactory): T +} + +abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) extends EntityRecorder { + import kamon.util.TriemapAtomicGetOrElseUpdate.Syntax + + private val _instruments = TrieMap.empty[MetricKey, Instrument] + private def register[T <: Instrument](key: MetricKey, instrument: ⇒ T): T = + _instruments.atomicGetOrElseUpdate(key, instrument).asInstanceOf[T] + + protected def histogram(name: String): Histogram = + register(HistogramKey(name), instrumentFactory.createHistogram(name)) + + protected def histogram(name: String, dynamicRange: DynamicRange): Histogram = + register(HistogramKey(name), instrumentFactory.createHistogram(name, Some(dynamicRange))) + + protected def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram = + register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name)) + + protected def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram = + register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name, Some(dynamicRange))) + + protected def histogram(key: HistogramKey): Histogram = + register(key, instrumentFactory.createHistogram(key.name)) + + protected def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram = + register(key, instrumentFactory.createHistogram(key.name, Some(dynamicRange))) + + protected def removeHistogram(name: String): Unit = + _instruments.remove(HistogramKey(name)) + + protected def removeHistogram(key: HistogramKey): Unit = + _instruments.remove(key) + + protected def minMaxCounter(name: String): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name)) + + protected def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange))) + + protected def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval))) + + protected def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name)) + + protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval))) + + protected def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange))) + + protected def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval))) + + protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval))) + + protected def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter = + register(key, instrumentFactory.createMinMaxCounter(key.name)) + + protected def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter = + register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange))) + + protected def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter = + register(key, instrumentFactory.createMinMaxCounter(key.name, refreshInterval = Some(refreshInterval))) + + protected def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange), Some(refreshInterval))) + + protected def removeMinMaxCounter(name: String): Unit = + _instruments.remove(MinMaxCounterKey(name)) + + protected def removeMinMaxCounter(key: MinMaxCounterKey): Unit = + _instruments.remove(key) + + protected def gauge(name: String, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, valueCollector = valueCollector)) + + protected def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector)) + + protected def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) + + protected def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, valueCollector = valueCollector)) + + protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector)) + + protected def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector)) + + protected def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) + + protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector)) + + protected def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge = + register(key, instrumentFactory.createGauge(key.name, valueCollector = valueCollector)) + + protected def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = + register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), valueCollector = valueCollector)) + + protected def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + register(key, instrumentFactory.createGauge(key.name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) + + protected def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector)) + + protected def removeGauge(name: String): Unit = + _instruments.remove(GaugeKey(name)) + + protected def removeGauge(key: GaugeKey): Unit = + _instruments.remove(key) + + protected def counter(name: String): Counter = + register(CounterKey(name), instrumentFactory.createCounter()) + + protected def counter(key: CounterKey): Counter = + register(key, instrumentFactory.createCounter()) + + protected def removeCounter(name: String): Unit = + _instruments.remove(CounterKey(name)) + + protected def removeCounter(key: CounterKey): Unit = + _instruments.remove(key) + + def collect(collectionContext: CollectionContext): EntitySnapshot = { + val snapshots = Map.newBuilder[MetricKey, InstrumentSnapshot] + _instruments.foreach { + case (key, instrument) ⇒ snapshots += key -> instrument.collect(collectionContext) + } + + new DefaultEntitySnapshot(snapshots.result()) + } + + def cleanup: Unit = _instruments.values.foreach(_.cleanup) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala new file mode 100644 index 00000000..17c8f4c5 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala @@ -0,0 +1,47 @@ +package kamon.metric + +import kamon.metric.instrument.{ Counter, Histogram, CollectionContext, InstrumentSnapshot } +import kamon.util.MapMerge +import scala.reflect.ClassTag + +trait EntitySnapshot { + def metrics: Map[MetricKey, InstrumentSnapshot] + def merge(that: EntitySnapshot, collectionContext: CollectionContext): EntitySnapshot + + def histogram(name: String): Option[Histogram.Snapshot] = + find[HistogramKey, Histogram.Snapshot](name) + + def minMaxCounter(name: String): Option[Histogram.Snapshot] = + find[MinMaxCounterKey, Histogram.Snapshot](name) + + def gauge(name: String): Option[Histogram.Snapshot] = + find[GaugeKey, Histogram.Snapshot](name) + + def counter(name: String): Option[Counter.Snapshot] = + find[CounterKey, Counter.Snapshot](name) + + def histograms: Map[HistogramKey, Histogram.Snapshot] = + filterByType[HistogramKey, Histogram.Snapshot] + + def minMaxCounters: Map[MinMaxCounterKey, Histogram.Snapshot] = + filterByType[MinMaxCounterKey, Histogram.Snapshot] + + def gauges: Map[GaugeKey, Histogram.Snapshot] = + filterByType[GaugeKey, Histogram.Snapshot] + + def counters: Map[CounterKey, Counter.Snapshot] = + filterByType[CounterKey, Counter.Snapshot] + + private def filterByType[K <: MetricKey, V <: InstrumentSnapshot](implicit keyCT: ClassTag[K]): Map[K, V] = + metrics.collect { case (k, v) if keyCT.runtimeClass.isInstance(k) ⇒ (k.asInstanceOf[K], v.asInstanceOf[V]) } + + private def find[K <: MetricKey, V <: InstrumentSnapshot](name: String)(implicit keyCT: ClassTag[K]) = + metrics.find { case (k, v) ⇒ keyCT.runtimeClass.isInstance(k) && k.name == name } map (_._2.asInstanceOf[V]) +} + +class DefaultEntitySnapshot(val metrics: Map[MetricKey, InstrumentSnapshot]) extends EntitySnapshot { + import MapMerge.Syntax + + override def merge(that: EntitySnapshot, collectionContext: CollectionContext): EntitySnapshot = + new DefaultEntitySnapshot(metrics.merge(that.metrics, (l, r) ⇒ l.merge(r, collectionContext))) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricKey.scala b/kamon-core/src/main/scala/kamon/metric/MetricKey.scala new file mode 100644 index 00000000..a17972df --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricKey.scala @@ -0,0 +1,153 @@ +package kamon.metric + +import kamon.metric.instrument.{ InstrumentTypes, InstrumentType, UnitOfMeasurement } + +/** + * MetricKeys are used to identify a given metric in entity recorders and snapshots. MetricKeys can be used to encode + * additional metadata for a metric being recorded, as well as the unit of measurement of the data being recorder. + */ +sealed trait MetricKey { + def name: String + def unitOfMeasurement: UnitOfMeasurement + def instrumentType: InstrumentType + def metadata: Map[String, String] +} + +// Wish that there was a shorter way to describe the operations bellow, but apparently there is no way to generalize all +// the apply/create versions that would produce the desired return types when used from Java. + +/** + * MetricKey for all Histogram-based metrics. + */ +case class HistogramKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { + val instrumentType = InstrumentTypes.Histogram +} + +object HistogramKey { + def apply(name: String): HistogramKey = + apply(name, UnitOfMeasurement.Unknown) + + def apply(name: String, unitOfMeasurement: UnitOfMeasurement): HistogramKey = + apply(name, unitOfMeasurement, Map.empty) + + def apply(name: String, metadata: Map[String, String]): HistogramKey = + apply(name, UnitOfMeasurement.Unknown, Map.empty) + + /** + * Java friendly versions: + */ + + def create(name: String): HistogramKey = + apply(name, UnitOfMeasurement.Unknown) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement): HistogramKey = + apply(name, unitOfMeasurement) + + def create(name: String, metadata: Map[String, String]): HistogramKey = + apply(name, metadata) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): HistogramKey = + apply(name, unitOfMeasurement, metadata) +} + +/** + * MetricKey for all MinMaxCounter-based metrics. + */ +case class MinMaxCounterKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { + val instrumentType = InstrumentTypes.MinMaxCounter +} + +object MinMaxCounterKey { + def apply(name: String): MinMaxCounterKey = + apply(name, UnitOfMeasurement.Unknown) + + def apply(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounterKey = + apply(name, unitOfMeasurement, Map.empty) + + def apply(name: String, metadata: Map[String, String]): MinMaxCounterKey = + apply(name, UnitOfMeasurement.Unknown, Map.empty) + + /** + * Java friendly versions: + */ + + def create(name: String): MinMaxCounterKey = + apply(name, UnitOfMeasurement.Unknown) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounterKey = + apply(name, unitOfMeasurement) + + def create(name: String, metadata: Map[String, String]): MinMaxCounterKey = + apply(name, metadata) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): MinMaxCounterKey = + apply(name, unitOfMeasurement, metadata) +} + +/** + * MetricKey for all Gauge-based metrics. + */ +case class GaugeKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { + val instrumentType = InstrumentTypes.Gauge +} + +object GaugeKey { + def apply(name: String): GaugeKey = + apply(name, UnitOfMeasurement.Unknown) + + def apply(name: String, unitOfMeasurement: UnitOfMeasurement): GaugeKey = + apply(name, unitOfMeasurement, Map.empty) + + def apply(name: String, metadata: Map[String, String]): GaugeKey = + apply(name, UnitOfMeasurement.Unknown, Map.empty) + + /** + * Java friendly versions: + */ + + def create(name: String): GaugeKey = + apply(name, UnitOfMeasurement.Unknown) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement): GaugeKey = + apply(name, unitOfMeasurement) + + def create(name: String, metadata: Map[String, String]): GaugeKey = + apply(name, metadata) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): GaugeKey = + apply(name, unitOfMeasurement, metadata) +} + +/** + * MetricKey for all Counter-based metrics. + */ +case class CounterKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { + val instrumentType = InstrumentTypes.Counter +} + +object CounterKey { + def apply(name: String): CounterKey = + apply(name, UnitOfMeasurement.Unknown) + + def apply(name: String, unitOfMeasurement: UnitOfMeasurement): CounterKey = + apply(name, unitOfMeasurement, Map.empty) + + def apply(name: String, metadata: Map[String, String]): CounterKey = + apply(name, UnitOfMeasurement.Unknown, Map.empty) + + /** + * Java friendly versions: + */ + + def create(name: String): CounterKey = + apply(name, UnitOfMeasurement.Unknown) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement): CounterKey = + apply(name, unitOfMeasurement) + + def create(name: String, metadata: Map[String, String]): CounterKey = + apply(name, metadata) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): CounterKey = + apply(name, unitOfMeasurement, metadata) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala index ed55ab06..b738eeb9 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -16,91 +16,119 @@ package kamon.metric -import akka.event.Logging.Error -import akka.event.EventStream +import akka.actor +import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe } +import kamon.{ ModuleSupervisor, Kamon } +import kamon.metric.instrument.{ InstrumentFactory, CollectionContext } import scala.collection.concurrent.TrieMap import akka.actor._ -import com.typesafe.config.Config -import kamon.util.GlobPathFilter -import kamon.Kamon -import akka.actor -import kamon.metric.Metrics.MetricGroupFilter -import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe } -import java.util.concurrent.TimeUnit +import kamon.util.{ FastDispatch, TriemapAtomicGetOrElseUpdate } -class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - import Metrics.AtomicGetOrElseUpdateForTriemap +object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { + override def get(system: ActorSystem): MetricsExtension = super.get(system) + def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtensionImpl(system) +} - val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") - printInitializationMessage(system.eventStream, metricsExtConfig.getBoolean("disable-aspectj-weaver-missing-error")) +case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T) - /** Configured Dispatchers */ - val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions")) - val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings")) +trait MetricsExtension extends Kamon.Extension { + def settings: MetricsExtensionSettings + def shouldTrack(entity: Entity): Boolean + def shouldTrack(entityName: String, category: String): Boolean = + shouldTrack(Entity(entityName, category)) - /** Configuration Settings */ - val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS) + def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] + def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] + def unregister(entity: Entity): Unit - val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() - val filters = loadFilters(metricsExtConfig) - lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") + def find(entity: Entity): Option[EntityRecorder] + def find(name: String, category: String): Option[EntityRecorder] - def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { - if (shouldTrack(identity)) - Some(storage.atomicGetOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) - else - None - } + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef): Unit = + subscribe(filter, subscriber, permanently = false) - def unregister(identity: MetricGroupIdentity): Unit = { - storage.remove(identity).map(_.cleanup) - } + def subscribe(category: String, selection: String, subscriber: ActorRef, permanently: Boolean): Unit = + subscribe(SubscriptionFilter(category, selection), subscriber, permanently) - def subscribe[C <: MetricGroupCategory](category: C, selection: String, subscriber: ActorRef, permanently: Boolean = false): Unit = - subscriptions.tell(Subscribe(category, selection, subscriber, permanently), subscriber) + def subscribe(category: String, selection: String, subscriber: ActorRef): Unit = + subscribe(SubscriptionFilter(category, selection), subscriber, permanently = false) - def unsubscribe(subscriber: ActorRef): Unit = - subscriptions.tell(Unsubscribe(subscriber), subscriber) + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean): Unit - def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { - import scala.concurrent.duration._ + def unsubscribe(subscriber: ActorRef): Unit + def buildDefaultCollectionContext: CollectionContext + def instrumentFactory(category: String): InstrumentFactory +} - system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) { - body - }(gaugeRecordingsDispatcher) - } +class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension { + import FastDispatch.Syntax - private def shouldTrack(identity: MetricGroupIdentity): Boolean = { - filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true) - } + val settings = MetricsExtensionSettings(system) - def loadFilters(config: Config): Map[String, MetricGroupFilter] = { - import scala.collection.JavaConverters._ + private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder] + private val _collectionContext = buildDefaultCollectionContext + private val _metricsCollectionDispatcher = system.dispatchers.lookup(settings.metricCollectionDispatcher) + private val _subscriptions = ModuleSupervisor.get(system).createModule("subscriptions-dispatcher", + SubscriptionsDispatcher.props(settings.tickInterval, collectSnapshots).withDispatcher(settings.metricCollectionDispatcher)) - val filters = config.getObjectList("filters").asScala + def shouldTrack(entity: Entity): Boolean = + settings.entityFilters.get(entity.category).map { + filter ⇒ filter.accept(entity.name) - val allFilters = - for ( - filter ← filters; - entry ← filter.entrySet().asScala - ) yield { - val key = entry.getKey - val keyBasedConfig = entry.getValue.atKey(key) + } getOrElse (settings.trackUnmatchedEntities) - val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList - val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList + def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] = { + import TriemapAtomicGetOrElseUpdate.Syntax + val entity = Entity(entityName, recorderFactory.category) - (key, MetricGroupFilter(includes, excludes)) - } + if (shouldTrack(entity)) { + val instrumentFactory = settings.instrumentFactories.get(recorderFactory.category).getOrElse(settings.defaultInstrumentFactory) + val recorder = _trackedEntities.atomicGetOrElseUpdate(entity, recorderFactory.createRecorder(instrumentFactory)).asInstanceOf[T] + Some(EntityRegistration(entity, recorder)) + } else None + } - allFilters.toMap + def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] = { + import TriemapAtomicGetOrElseUpdate.Syntax + EntityRegistration(entity, _trackedEntities.atomicGetOrElseUpdate(entity, recorder).asInstanceOf[T]) } + def unregister(entity: Entity): Unit = + _trackedEntities.remove(entity).map(_.cleanup) + + def find(entity: Entity): Option[EntityRecorder] = + _trackedEntities.get(entity) + + def find(name: String, category: String): Option[EntityRecorder] = + find(Entity(name, category)) + + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = + _subscriptions.fastDispatch(Subscribe(filter, subscriber, permanent))(_metricsCollectionDispatcher) + + def unsubscribe(subscriber: ActorRef): Unit = + _subscriptions.fastDispatch(Unsubscribe(subscriber))(_metricsCollectionDispatcher) + def buildDefaultCollectionContext: CollectionContext = - CollectionContext(metricsExtConfig.getInt("default-collection-context-buffer-size")) + CollectionContext(settings.defaultCollectionContextBufferSize) + + def instrumentFactory(category: String): InstrumentFactory = + settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory) + + /** + * Collect and dispatch. + */ + private def collectSnapshots(): Map[Entity, EntitySnapshot] = { + val builder = Map.newBuilder[Entity, EntitySnapshot] + _trackedEntities.foreach { + case (identity, recorder) ⇒ builder += ((identity, recorder.collect(_collectionContext))) + } - def printInitializationMessage(eventStream: EventStream, disableWeaverMissingError: Boolean): Unit = { + builder.result() + } + + /* def printInitializationMessage(eventStream: EventStream, disableWeaverMissingError: Boolean): Unit = { if (!disableWeaverMissingError) { val weaverMissingMessage = """ @@ -123,22 +151,6 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { eventStream.publish(Error("MetricsExtension", classOf[MetricsExtension], weaverMissingMessage)) } - } + }*/ } -object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Metrics - def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system) - - case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { - def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) - } - - implicit class AtomicGetOrElseUpdateForTriemap[K, V](trieMap: TrieMap[K, V]) { - def atomicGetOrElseUpdate(key: K, op: ⇒ V): V = - trieMap.get(key) match { - case Some(v) ⇒ v - case None ⇒ val d = op; trieMap.putIfAbsent(key, d).getOrElse(d) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala new file mode 100644 index 00000000..ca1db850 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala @@ -0,0 +1,100 @@ +package kamon.metric + +import akka.actor.ExtendedActorSystem +import com.typesafe.config.Config +import kamon.metric.instrument.{ RefreshScheduler, InstrumentFactory, DefaultInstrumentSettings, InstrumentCustomSettings } +import kamon.util.GlobPathFilter + +import scala.concurrent.duration.FiniteDuration + +/** + * Configuration settings for the Metrics extension, as read from the `kamon.metric` configuration key. + */ +case class MetricsExtensionSettings( + tickInterval: FiniteDuration, + defaultCollectionContextBufferSize: Int, + trackUnmatchedEntities: Boolean, + entityFilters: Map[String, EntityFilter], + instrumentFactories: Map[String, InstrumentFactory], + defaultInstrumentFactory: InstrumentFactory, + metricCollectionDispatcher: String, + refreshSchedulerDispatcher: String, + refreshScheduler: RefreshScheduler) + +/** + * + */ +case class EntityFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { + def accept(name: String): Boolean = + includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) +} + +object MetricsExtensionSettings { + import kamon.util.ConfigTools.Syntax + import scala.concurrent.duration._ + + def apply(system: ExtendedActorSystem): MetricsExtensionSettings = { + val metricConfig = system.settings.config.getConfig("kamon.metric") + + val tickInterval = metricConfig.getFiniteDuration("tick-interval") + val collectBufferSize = metricConfig.getInt("default-collection-context-buffer-size") + val trackUnmatchedEntities = metricConfig.getBoolean("track-unmatched-entities") + val entityFilters = loadFilters(metricConfig.getConfig("filters")) + val defaultInstrumentSettings = DefaultInstrumentSettings.fromConfig(metricConfig.getConfig("default-instrument-settings")) + val metricCollectionDispatcher = metricConfig.getString("dispatchers.metric-collection") + val refreshSchedulerDispatcher = metricConfig.getString("dispatchers.refresh-scheduler") + + val refreshScheduler = RefreshScheduler(system.scheduler, system.dispatchers.lookup(refreshSchedulerDispatcher)) + val instrumentFactories = loadInstrumentFactories(metricConfig.getConfig("instrument-settings"), defaultInstrumentSettings, refreshScheduler) + val defaultInstrumentFactory = new InstrumentFactory(Map.empty, defaultInstrumentSettings, refreshScheduler) + + MetricsExtensionSettings(tickInterval, collectBufferSize, trackUnmatchedEntities, entityFilters, instrumentFactories, + defaultInstrumentFactory, metricCollectionDispatcher, refreshSchedulerDispatcher, refreshScheduler) + } + + /** + * Load all the default filters configured under the `kamon.metric.filters` configuration key. All filters are + * defined with the entity category as a sub-key of the `kamon.metric.filters` key and two sub-keys to it: includes + * and excludes with lists of string glob patterns as values. Example: + * + * {{{ + * + * kamon.metrics.filters { + * actor { + * includes = ["user/test-actor", "user/service/worker-*"] + * excludes = ["user/IO-*"] + * } + * } + * + * }}} + * + * @return a Map from category name to corresponding entity filter. + */ + def loadFilters(filtersConfig: Config): Map[String, EntityFilter] = { + import scala.collection.JavaConverters._ + + filtersConfig.firstLevelKeys map { category: String ⇒ + val includes = filtersConfig.getStringList(s"$category.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList + val excludes = filtersConfig.getStringList(s"$category.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList + + (category, EntityFilter(includes, excludes)) + } toMap + } + + /** + * Load any custom configuration settings defined under the `kamon.metric.instrument-settings` configuration key and + * create InstrumentFactories for them. + * + * @return a Map from category name to InstrumentFactory. + */ + def loadInstrumentFactories(instrumentSettings: Config, defaults: DefaultInstrumentSettings, refreshScheduler: RefreshScheduler): Map[String, InstrumentFactory] = { + instrumentSettings.firstLevelKeys.map { category ⇒ + val categoryConfig = instrumentSettings.getConfig(category) + val customSettings = categoryConfig.firstLevelKeys.map { instrumentName ⇒ + (instrumentName, InstrumentCustomSettings.fromConfig(categoryConfig.getConfig(instrumentName))) + } toMap + + (category, new InstrumentFactory(customSettings, defaults, refreshScheduler)) + } toMap + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/Scale.scala b/kamon-core/src/main/scala/kamon/metric/Scale.scala deleted file mode 100644 index 2f27c1a3..00000000 --- a/kamon-core/src/main/scala/kamon/metric/Scale.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -class Scale(val numericValue: Double) extends AnyVal - -object Scale { - val Nano = new Scale(1E-9) - val Micro = new Scale(1E-6) - val Milli = new Scale(1E-3) - val Unit = new Scale(1) - val Kilo = new Scale(1E3) - val Mega = new Scale(1E6) - val Giga = new Scale(1E9) - - def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue -} diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala deleted file mode 100644 index a22e1c21..00000000 --- a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala +++ /dev/null @@ -1,173 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import akka.actor._ -import kamon.metric.Subscriptions._ -import kamon.util.GlobPathFilter -import scala.concurrent.duration.{ FiniteDuration, Duration } -import java.util.concurrent.TimeUnit -import kamon.{ MilliTimestamp, Kamon } -import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer - -class Subscriptions extends Actor { - import context.system - - val flushMetricsSchedule = scheduleFlushMessage() - val collectionContext = Kamon(Metrics).buildDefaultCollectionContext - - var lastTick: MilliTimestamp = MilliTimestamp.now - var oneShotSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty - var permanentSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty - - def receive = { - case Subscribe(category, selection, subscriber, permanent) ⇒ subscribe(category, selection, subscriber, permanent) - case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) - case Terminated(subscriber) ⇒ unsubscribe(subscriber) - case FlushMetrics ⇒ flush() - } - - def subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanent: Boolean): Unit = { - context.watch(subscriber) - val newFilter: MetricSelectionFilter = GroupAndPatternFilter(category, new GlobPathFilter(selection)) - - if (permanent) { - permanentSubscriptions = permanentSubscriptions.updated(subscriber, newFilter combine { - permanentSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty) - }) - } else { - oneShotSubscriptions = oneShotSubscriptions.updated(subscriber, newFilter combine { - oneShotSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty) - }) - } - } - - def unsubscribe(subscriber: ActorRef): Unit = { - if (permanentSubscriptions.contains(subscriber)) - permanentSubscriptions = permanentSubscriptions - subscriber - - if (oneShotSubscriptions.contains(subscriber)) - oneShotSubscriptions = oneShotSubscriptions - subscriber - } - - def flush(): Unit = { - val currentTick = MilliTimestamp.now - val snapshots = collectAll() - - dispatchSelectedMetrics(lastTick, currentTick, permanentSubscriptions, snapshots) - dispatchSelectedMetrics(lastTick, currentTick, oneShotSubscriptions, snapshots) - - lastTick = currentTick - oneShotSubscriptions = Map.empty - } - - def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] = { - val allMetrics = Kamon(Metrics).storage - val builder = Map.newBuilder[MetricGroupIdentity, MetricGroupSnapshot] - - allMetrics.foreach { - case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext))) - } - - builder.result() - } - - def dispatchSelectedMetrics(lastTick: MilliTimestamp, currentTick: MilliTimestamp, subscriptions: Map[ActorRef, MetricSelectionFilter], - snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { - - for ((subscriber, filter) ← subscriptions) { - val selection = snapshots.filter(group ⇒ filter.accept(group._1)) - val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) - - subscriber ! tickMetrics - } - } - - def scheduleFlushMessage(): Cancellable = { - val config = context.system.settings.config - val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS) - context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) - } -} - -object Subscriptions { - case object FlushMetrics - case class Unsubscribe(subscriber: ActorRef) - case class Subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanently: Boolean = false) - case class TickMetricSnapshot(from: MilliTimestamp, to: MilliTimestamp, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) - - trait MetricSelectionFilter { - def accept(identity: MetricGroupIdentity): Boolean - } - - object MetricSelectionFilter { - val empty = new MetricSelectionFilter { - def accept(identity: MetricGroupIdentity): Boolean = false - } - - implicit class CombinableMetricSelectionFilter(msf: MetricSelectionFilter) { - def combine(that: MetricSelectionFilter): MetricSelectionFilter = new MetricSelectionFilter { - def accept(identity: MetricGroupIdentity): Boolean = msf.accept(identity) || that.accept(identity) - } - } - } - - case class GroupAndPatternFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) extends MetricSelectionFilter { - def accept(identity: MetricGroupIdentity): Boolean = { - category.equals(identity.category) && globFilter.accept(identity.name) - } - } -} - -class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { - val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) - val collectionContext = Kamon(Metrics)(context.system).buildDefaultCollectionContext - - def receive = empty - - def empty: Actor.Receive = { - case tick: TickMetricSnapshot ⇒ context become (buffering(tick)) - case FlushBuffer ⇒ // Nothing to flush. - } - - def buffering(buffered: TickMetricSnapshot): Actor.Receive = { - case TickMetricSnapshot(_, to, tickMetrics) ⇒ - val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) - val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) - - context become (buffering(combinedSnapshot)) - - case FlushBuffer ⇒ - receiver ! buffered - context become (empty) - - } - - override def postStop(): Unit = { - flushSchedule.cancel() - super.postStop() - } - - def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = left.merge(right.asInstanceOf[left.GroupSnapshotType], collectionContext).asInstanceOf[MetricGroupSnapshot] // ??? //Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r, collectionContext))) -} - -object TickMetricSnapshotBuffer { - case object FlushBuffer - - def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = - Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) -} diff --git a/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala new file mode 100644 index 00000000..f616be35 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala @@ -0,0 +1,115 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor._ +import kamon.metric.SubscriptionsDispatcher._ +import kamon.util.{ MilliTimestamp, GlobPathFilter } +import scala.concurrent.duration.FiniteDuration + +/** + * Manages subscriptions to metrics and dispatch snapshots on every tick to all subscribers. + */ +private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, collector: () ⇒ Map[Entity, EntitySnapshot]) extends Actor { + var lastTick = MilliTimestamp.now + var oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter] + var permanentSubscriptions = Map.empty[ActorRef, SubscriptionFilter] + val tickSchedule = context.system.scheduler.schedule(interval, interval, self, Tick)(context.dispatcher) + + def receive = { + case Tick ⇒ processTick() + case Subscribe(filter, subscriber, permanently) ⇒ subscribe(filter, subscriber, permanently) + case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) + case Terminated(subscriber) ⇒ unsubscribe(subscriber) + } + + def processTick(): Unit = + dispatch(collector()) + + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = { + def addSubscription(storage: Map[ActorRef, SubscriptionFilter]): Map[ActorRef, SubscriptionFilter] = + storage.updated(subscriber, storage.getOrElse(subscriber, SubscriptionFilter.Empty).combine(filter)) + + context.watch(subscriber) + + if (permanent) + permanentSubscriptions = addSubscription(permanentSubscriptions) + else + oneShotSubscriptions = addSubscription(oneShotSubscriptions) + } + + def unsubscribe(subscriber: ActorRef): Unit = { + permanentSubscriptions = permanentSubscriptions - subscriber + oneShotSubscriptions = oneShotSubscriptions - subscriber + } + + def dispatch(snapshots: Map[Entity, EntitySnapshot]): Unit = { + val currentTick = MilliTimestamp.now + + dispatchSelections(lastTick, currentTick, permanentSubscriptions, snapshots) + dispatchSelections(lastTick, currentTick, oneShotSubscriptions, snapshots) + + lastTick = currentTick + oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter] + } + + def dispatchSelections(lastTick: MilliTimestamp, currentTick: MilliTimestamp, subscriptions: Map[ActorRef, SubscriptionFilter], + snapshots: Map[Entity, EntitySnapshot]): Unit = { + + for ((subscriber, filter) ← subscriptions) { + val selection = snapshots.filter(group ⇒ filter.accept(group._1)) + val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) + + subscriber ! tickMetrics + } + } +} + +object SubscriptionsDispatcher { + def props(interval: FiniteDuration, collector: () ⇒ Map[Entity, EntitySnapshot]): Props = + Props(new SubscriptionsDispatcher(interval, collector)) + + case object Tick + case class Unsubscribe(subscriber: ActorRef) + case class Subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean = false) + case class TickMetricSnapshot(from: MilliTimestamp, to: MilliTimestamp, metrics: Map[Entity, EntitySnapshot]) + +} + +trait SubscriptionFilter { self ⇒ + + def accept(entity: Entity): Boolean + + final def combine(that: SubscriptionFilter): SubscriptionFilter = new SubscriptionFilter { + override def accept(entity: Entity): Boolean = self.accept(entity) || that.accept(entity) + } +} + +object SubscriptionFilter { + val Empty = new SubscriptionFilter { + def accept(entity: Entity): Boolean = false + } + + def apply(category: String, name: String): SubscriptionFilter = new SubscriptionFilter { + val categoryPattern = new GlobPathFilter(category) + val namePattern = new GlobPathFilter(name) + + def accept(entity: Entity): Boolean = { + categoryPattern.accept(entity.category) && namePattern.accept(entity.name) + } + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala new file mode 100644 index 00000000..b9127118 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala @@ -0,0 +1,49 @@ +package kamon.metric + +import akka.actor.{ Props, Actor, ActorRef } +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer +import kamon.metric.instrument.CollectionContext +import kamon.util.MapMerge + +import scala.concurrent.duration.FiniteDuration + +class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { + import MapMerge.Syntax + + val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) + val collectionContext: CollectionContext = Kamon(Metrics)(context.system).buildDefaultCollectionContext + + def receive = empty + + def empty: Actor.Receive = { + case tick: TickMetricSnapshot ⇒ context become (buffering(tick)) + case FlushBuffer ⇒ // Nothing to flush. + } + + def buffering(buffered: TickMetricSnapshot): Actor.Receive = { + case TickMetricSnapshot(_, to, tickMetrics) ⇒ + val combinedMetrics = buffered.metrics.merge(tickMetrics, (l, r) ⇒ l.merge(r, collectionContext)) + val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) + + context become (buffering(combinedSnapshot)) + + case FlushBuffer ⇒ + receiver ! buffered + context become (empty) + + } + + override def postStop(): Unit = { + flushSchedule.cancel() + super.postStop() + } +} + +object TickMetricSnapshotBuffer { + case object FlushBuffer + + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = + Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) +} diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala index eaad6e0d..3da9c1d4 100644 --- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -16,67 +16,29 @@ package kamon.metric -import akka.actor.ActorSystem -import kamon.metric.instrument.{ Histogram } +import kamon.metric.instrument.{ Time, InstrumentFactory, Histogram } -import scala.collection.concurrent.TrieMap -import com.typesafe.config.Config +class TraceMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + import TraceMetrics.segmentKey -case class TraceMetrics(name: String) extends MetricGroupIdentity { - val category = TraceMetrics -} - -object TraceMetrics extends MetricGroupCategory { - import Metrics.AtomicGetOrElseUpdateForTriemap - - val name = "trace" - - case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" } - - case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram) - extends MetricGroupRecorder { - - val segments = TrieMap[MetricIdentity, Histogram]() - - def segmentRecorder(segmentIdentity: MetricIdentity): Histogram = - segments.atomicGetOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) - - def collect(context: CollectionContext): TraceMetricsSnapshot = - TraceMetricsSnapshot( - elapsedTime.collect(context), - segments.map { case (identity, recorder) ⇒ (identity, recorder.collect(context)) }.toMap) + /** + * Records blah blah + */ + val ElapsedTime = histogram("elapsed-time", unitOfMeasurement = Time.Nanoseconds) - def cleanup: Unit = {} - } - - case class TraceMetricsSnapshot(elapsedTime: Histogram.Snapshot, segments: Map[MetricIdentity, Histogram.Snapshot]) - extends MetricGroupSnapshot { - - type GroupSnapshotType = TraceMetricsSnapshot - - def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot = - TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), combineMaps(segments, that.segments)((l, r) ⇒ l.merge(r, context))) - - def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) - } - - val Factory = TraceMetricGroupFactory + /** + * Records Blah Blah. + * + */ + def segment(name: String, category: String, library: String): Histogram = + histogram(segmentKey(name, category, library)) } -case object TraceMetricGroupFactory extends MetricGroupFactory { - - import TraceMetrics._ - - type GroupRecorder = TraceMetricRecorder - - def create(config: Config, system: ActorSystem): TraceMetricRecorder = { - val settings = config.getConfig("precision.trace") - val elapsedTimeConfig = settings.getConfig("elapsed-time") - val segmentConfig = settings.getConfig("segment") +object TraceMetrics extends EntityRecorderFactory[TraceMetrics] { + def category: String = "trace" + def createRecorder(instrumentFactory: InstrumentFactory): TraceMetrics = new TraceMetrics(instrumentFactory) - new TraceMetricRecorder( - Histogram.fromConfig(elapsedTimeConfig, Scale.Nano), - () ⇒ Histogram.fromConfig(segmentConfig, Scale.Nano)) - } + def segmentKey(name: String, category: String, library: String): HistogramKey = + HistogramKey(name, Time.Nanoseconds, Map("category" -> category, "library" -> library)) } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala index b7ac1ac5..5e1a7629 100644 --- a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala @@ -1,189 +1,193 @@ package kamon.metric import akka.actor -import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } +import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionIdProvider, ExtensionId } import kamon.Kamon -import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram } +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange +import kamon.metric.instrument._ import scala.concurrent.duration.FiniteDuration -class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - import Metrics.AtomicGetOrElseUpdateForTriemap - import UserMetrics._ - - lazy val metricsExtension = Kamon(Metrics)(system) - val precisionConfig = system.settings.config.getConfig("kamon.metrics.precision") - - val defaultHistogramPrecisionConfig = precisionConfig.getConfig("default-histogram-precision") - val defaultMinMaxCounterPrecisionConfig = precisionConfig.getConfig("default-min-max-counter-precision") - val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision") +object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider { + override def get(system: ActorSystem): UserMetricsExtension = super.get(system) + def lookup(): ExtensionId[_ <: actor.Extension] = UserMetrics + def createExtension(system: ExtendedActorSystem): UserMetricsExtension = { + val metricsExtension = Metrics.get(system) + val instrumentFactory = metricsExtension.instrumentFactory(entity.category) + val userMetricsExtension = new UserMetricsExtensionImpl(instrumentFactory) - def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = { - metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), { - UserHistogramRecorder(Histogram(highestTrackableValue, precision, Scale.Unit)) - }).asInstanceOf[UserHistogramRecorder].histogram + metricsExtension.register(entity, userMetricsExtension).recorder } - def registerHistogram(name: String): Histogram = { - metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), { - UserHistogramRecorder(Histogram.fromConfig(defaultHistogramPrecisionConfig)) - }).asInstanceOf[UserHistogramRecorder].histogram - } + val entity = Entity("user-metric", "user-metric") +} - def registerCounter(name: String): Counter = { - metricsExtension.storage.atomicGetOrElseUpdate(UserCounter(name), { - UserCounterRecorder(Counter()) - }).asInstanceOf[UserCounterRecorder].counter - } +trait UserMetricsExtension extends Kamon.Extension { + def histogram(name: String): Histogram + def histogram(name: String, dynamicRange: DynamicRange): Histogram + def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram + def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram + def histogram(key: HistogramKey): Histogram + def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram + def removeHistogram(name: String): Unit + def removeHistogram(key: HistogramKey): Unit + + def minMaxCounter(name: String): MinMaxCounter + def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter + def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter + def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter + def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter + def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter + def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter + def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter + def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter + def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter + def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter + def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter + def removeMinMaxCounter(name: String): Unit + def removeMinMaxCounter(key: MinMaxCounterKey): Unit + + def gauge(name: String, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge + def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge + def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge + def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge + def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge + def removeGauge(name: String): Unit + def removeGauge(key: GaugeKey): Unit + + def counter(name: String): Counter + def counter(key: CounterKey): Counter + def removeCounter(name: String): Unit + def removeCounter(key: CounterKey): Unit - def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, - refreshInterval: FiniteDuration): MinMaxCounter = { - metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), { - UserMinMaxCounterRecorder(MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system)) - }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter - } +} - def registerMinMaxCounter(name: String): MinMaxCounter = { - metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), { - UserMinMaxCounterRecorder(MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system)) - }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter - } +class UserMetricsExtensionImpl(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with UserMetricsExtension { + override def histogram(name: String): Histogram = + super.histogram(name) - def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = { - metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), { - UserGaugeRecorder(Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector)) - }).asInstanceOf[UserGaugeRecorder].gauge - } + override def histogram(name: String, dynamicRange: DynamicRange): Histogram = + super.histogram(name, dynamicRange) - def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, - refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = { - metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), { - UserGaugeRecorder(Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector)) - }).asInstanceOf[UserGaugeRecorder].gauge - } + override def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram = + super.histogram(name, unitOfMeasurement) - def removeHistogram(name: String): Unit = - metricsExtension.unregister(UserHistogram(name)) + override def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram = + super.histogram(name, dynamicRange, unitOfMeasurement) - def removeCounter(name: String): Unit = - metricsExtension.unregister(UserCounter(name)) + override def histogram(key: HistogramKey): Histogram = + super.histogram(key) - def removeMinMaxCounter(name: String): Unit = - metricsExtension.unregister(UserMinMaxCounter(name)) + override def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram = + super.histogram(key, dynamicRange) - def removeGauge(name: String): Unit = - metricsExtension.unregister(UserGauge(name)) -} + override def removeHistogram(name: String): Unit = + super.removeHistogram(name) -object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + override def removeHistogram(key: HistogramKey): Unit = + super.removeHistogram(key) - def createExtension(system: ExtendedActorSystem): UserMetricsExtension = new UserMetricsExtension(system) + override def minMaxCounter(name: String): MinMaxCounter = + super.minMaxCounter(name) - sealed trait UserMetricGroup - // - // Histograms - // + override def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter = + super.minMaxCounter(name, dynamicRange) - case class UserHistogram(name: String) extends MetricGroupIdentity with UserMetricGroup { - val category = UserHistograms - } + override def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter = + super.minMaxCounter(name, refreshInterval) - case class UserHistogramRecorder(histogram: Histogram) extends MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot = - UserHistogramSnapshot(histogram.collect(context)) + override def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + super.minMaxCounter(name, unitOfMeasurement) - def cleanup: Unit = histogram.cleanup - } + override def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + super.minMaxCounter(name, dynamicRange, refreshInterval) - case class UserHistogramSnapshot(histogramSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot { - type GroupSnapshotType = UserHistogramSnapshot + override def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + super.minMaxCounter(name, dynamicRange, unitOfMeasurement) - def merge(that: UserHistogramSnapshot, context: CollectionContext): UserHistogramSnapshot = - UserHistogramSnapshot(that.histogramSnapshot.merge(histogramSnapshot, context)) + override def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + super.minMaxCounter(name, refreshInterval, unitOfMeasurement) - def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, histogramSnapshot)) - } + override def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + super.minMaxCounter(name, dynamicRange, refreshInterval, unitOfMeasurement) - // - // Counters - // + override def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter = + super.minMaxCounter(key) - case class UserCounter(name: String) extends MetricGroupIdentity with UserMetricGroup { - val category = UserCounters - } + override def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter = + super.minMaxCounter(key, dynamicRange) - case class UserCounterRecorder(counter: Counter) extends MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot = - UserCounterSnapshot(counter.collect(context)) + override def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter = + super.minMaxCounter(key, refreshInterval) - def cleanup: Unit = counter.cleanup - } + override def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + super.minMaxCounter(key, dynamicRange, refreshInterval) - case class UserCounterSnapshot(counterSnapshot: Counter.Snapshot) extends MetricGroupSnapshot { - type GroupSnapshotType = UserCounterSnapshot + override def removeMinMaxCounter(name: String): Unit = + super.removeMinMaxCounter(name) - def merge(that: UserCounterSnapshot, context: CollectionContext): UserCounterSnapshot = - UserCounterSnapshot(that.counterSnapshot.merge(counterSnapshot, context)) + override def removeMinMaxCounter(key: MinMaxCounterKey): Unit = + super.removeMinMaxCounter(key) - def metrics: Map[MetricIdentity, MetricSnapshot] = Map((Count, counterSnapshot)) - } + override def gauge(name: String, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, valueCollector) - // - // MinMaxCounters - // + override def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, dynamicRange, valueCollector) - case class UserMinMaxCounter(name: String) extends MetricGroupIdentity with UserMetricGroup { - val category = UserMinMaxCounters - } + override def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, refreshInterval, valueCollector) - case class UserMinMaxCounterRecorder(minMaxCounter: MinMaxCounter) extends MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot = - UserMinMaxCounterSnapshot(minMaxCounter.collect(context)) + override def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, unitOfMeasurement, valueCollector) - def cleanup: Unit = minMaxCounter.cleanup - } + override def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, dynamicRange, refreshInterval, valueCollector) - case class UserMinMaxCounterSnapshot(minMaxCounterSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot { - type GroupSnapshotType = UserMinMaxCounterSnapshot + override def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, dynamicRange, unitOfMeasurement, valueCollector) - def merge(that: UserMinMaxCounterSnapshot, context: CollectionContext): UserMinMaxCounterSnapshot = - UserMinMaxCounterSnapshot(that.minMaxCounterSnapshot.merge(minMaxCounterSnapshot, context)) + override def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, refreshInterval, unitOfMeasurement, valueCollector) - def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, minMaxCounterSnapshot)) - } - - // - // Gauges - // + override def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, dynamicRange, refreshInterval, unitOfMeasurement, valueCollector) - case class UserGauge(name: String) extends MetricGroupIdentity with UserMetricGroup { - val category = UserGauges - } + override def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge = + super.gauge(key, valueCollector) - case class UserGaugeRecorder(gauge: Gauge) extends MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot = - UserGaugeSnapshot(gauge.collect(context)) + override def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = + super.gauge(key, dynamicRange, valueCollector) - def cleanup: Unit = gauge.cleanup - } + override def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + super.gauge(key, refreshInterval, valueCollector) - case class UserGaugeSnapshot(gaugeSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot { - type GroupSnapshotType = UserGaugeSnapshot + override def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + super.gauge(key, dynamicRange, refreshInterval, valueCollector) - def merge(that: UserGaugeSnapshot, context: CollectionContext): UserGaugeSnapshot = - UserGaugeSnapshot(that.gaugeSnapshot.merge(gaugeSnapshot, context)) + override def removeGauge(name: String): Unit = + super.removeGauge(name) - def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, gaugeSnapshot)) - } + override def removeGauge(key: GaugeKey): Unit = + super.removeGauge(key) - case object UserHistograms extends MetricGroupCategory { val name: String = "histogram" } - case object UserCounters extends MetricGroupCategory { val name: String = "counter" } - case object UserMinMaxCounters extends MetricGroupCategory { val name: String = "min-max-counter" } - case object UserGauges extends MetricGroupCategory { val name: String = "gauge" } + override def counter(name: String): Counter = + super.counter(name) - case object RecordedValues extends MetricIdentity { val name: String = "values" } - case object Count extends MetricIdentity { val name: String = "count" } + override def counter(key: CounterKey): Counter = + super.counter(key) -} + override def removeCounter(name: String): Unit = + super.removeCounter(name) + override def removeCounter(key: CounterKey): Unit = + super.removeCounter(key) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala b/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala new file mode 100644 index 00000000..e79090a8 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala @@ -0,0 +1,35 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package org.HdrHistogram + +import java.util.concurrent.atomic.{ AtomicLongArray, AtomicLongFieldUpdater } + +trait AtomicHistogramFieldsAccessor { + self: AtomicHistogram ⇒ + + def countsArray(): AtomicLongArray = self.counts + + def unitMagnitude(): Int = self.unitMagnitude + + def subBucketHalfCount(): Int = self.subBucketHalfCount + + def subBucketHalfCountMagnitude(): Int = self.subBucketHalfCountMagnitude +} + +object AtomicHistogramFieldsAccessor { + def totalCountUpdater(): AtomicLongFieldUpdater[AtomicHistogram] = AtomicHistogram.totalCountUpdater +} diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala index 0f29ba6f..c1b69cbe 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala @@ -17,9 +17,8 @@ package kamon.metric.instrument import kamon.jsr166.LongAdder -import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder } -trait Counter extends MetricRecorder { +trait Counter extends Instrument { type SnapshotType = Counter.Snapshot def increment(): Unit @@ -29,12 +28,11 @@ trait Counter extends MetricRecorder { object Counter { def apply(): Counter = new LongAdderCounter + def create(): Counter = apply() - trait Snapshot extends MetricSnapshot { - type SnapshotType = Counter.Snapshot - + trait Snapshot extends InstrumentSnapshot { def count: Long - def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot + def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot } } @@ -55,5 +53,8 @@ class LongAdderCounter extends Counter { } case class CounterSnapshot(count: Long) extends Counter.Snapshot { - def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count) + def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot = that match { + case CounterSnapshot(thatCount) ⇒ CounterSnapshot(count + thatCount) + case other ⇒ sys.error(s"Cannot merge a CounterSnapshot with the incompatible [${other.getClass.getName}] type.") + } } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala index efd7d78f..2341504c 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala @@ -1,70 +1,89 @@ package kamon.metric.instrument -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{ AtomicLong, AtomicLongFieldUpdater, AtomicReference } -import akka.actor.{ Cancellable, ActorSystem } -import com.typesafe.config.Config -import kamon.metric.{ CollectionContext, Scale, MetricRecorder } +import akka.actor.Cancellable +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange import scala.concurrent.duration.FiniteDuration -trait Gauge extends MetricRecorder { +trait Gauge extends Instrument { type SnapshotType = Histogram.Snapshot - def record(value: Long) - def record(value: Long, count: Long) + def record(value: Long): Unit + def record(value: Long, count: Long): Unit + def refreshValue(): Unit } object Gauge { - trait CurrentValueCollector { - def currentValue: Long - } - - def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration, - system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { - - val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) - val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector) - - val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = { + val underlyingHistogram = Histogram(dynamicRange) + val gauge = new HistogramBackedGauge(underlyingHistogram, valueCollector) + val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ { gauge.refreshValue() - }(system.dispatcher) // TODO: Move this to Kamon dispatchers + }) - gauge.refreshValuesSchedule.set(refreshValuesSchedule) + gauge.automaticValueCollectorSchedule.set(refreshValuesSchedule) gauge } - def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge = - fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction)) + def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = + apply(dynamicRange, refreshInterval, scheduler, valueCollector) - def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = { - val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision") - fromConfig(config, system)(currentValueCollector) + trait CurrentValueCollector { + def currentValue: Long } - def fromConfig(config: Config, system: ActorSystem, scale: Scale)(currentValueCollector: CurrentValueCollector): Gauge = { - import scala.concurrent.duration._ + implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { + def currentValue: Long = f.apply() + } +} - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) +/** + * Helper for cases in which a gauge shouldn't store the current value of a observed value but the difference between + * the current observed value and the previously observed value. Should only be used if the observed value is always + * increasing or staying steady, but is never able to decrease. + * + * Note: The first time a value is collected, this wrapper will always return zero, afterwards, the difference between + * the current value and the last value will be returned. + */ +class DifferentialValueCollector(wrappedValueCollector: CurrentValueCollector) extends CurrentValueCollector { + @volatile private var _readAtLeastOnce = false + private val _lastObservedValue = new AtomicLong(0) + + def currentValue: Long = { + if (_readAtLeastOnce) { + val wrappedCurrent = wrappedValueCollector.currentValue + val d = wrappedCurrent - _lastObservedValue.getAndSet(wrappedCurrent) + + if (d < 0) + println("HUBO MENOR QUE CERO") + + d + + } else { + _lastObservedValue.set(wrappedValueCollector.currentValue) + _readAtLeastOnce = true + 0 + } - Gauge(Histogram.Precision(significantDigits), highest, scale, refreshInterval.millis, system)(currentValueCollector) } +} - def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { - fromConfig(config, system, Scale.Unit)(currentValueCollector) - } +object DifferentialValueCollector { + def apply(wrappedValueCollector: CurrentValueCollector): CurrentValueCollector = + new DifferentialValueCollector(wrappedValueCollector) - implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { - def currentValue: Long = f.apply() - } + def apply(wrappedValueCollector: ⇒ Long): CurrentValueCollector = + new DifferentialValueCollector(new CurrentValueCollector { + def currentValue: Long = wrappedValueCollector + }) } class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge { - val refreshValuesSchedule = new AtomicReference[Cancellable]() + private[kamon] val automaticValueCollectorSchedule = new AtomicReference[Cancellable]() def record(value: Long): Unit = underlyingHistogram.record(value) @@ -73,10 +92,15 @@ class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context) def cleanup: Unit = { - if (refreshValuesSchedule.get() != null) - refreshValuesSchedule.get().cancel() + if (automaticValueCollectorSchedule.get() != null) + automaticValueCollectorSchedule.get().cancel() } - def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue) + def refreshValue(): Unit = { + val a = currentValueCollector.currentValue + if (a < 0) + println("RECORDING FROM GAUGE => " + a + " - " + currentValueCollector.getClass) + underlyingHistogram.record(a) + } } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala index bed75fc8..5c4c7f71 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -17,12 +17,11 @@ package kamon.metric.instrument import java.nio.LongBuffer -import com.typesafe.config.Config import org.HdrHistogram.AtomicHistogramFieldsAccessor +import kamon.metric.instrument.Histogram.{ Snapshot, DynamicRange } import org.HdrHistogram.AtomicHistogram -import kamon.metric._ -trait Histogram extends MetricRecorder { +trait Histogram extends Instrument { type SnapshotType = Histogram.Snapshot def record(value: Long) @@ -31,30 +30,40 @@ trait Histogram extends MetricRecorder { object Histogram { - def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram = - new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale) - - def fromConfig(config: Config): Histogram = { - fromConfig(config, Scale.Unit) - } - - def fromConfig(config: Config, scale: Scale): Histogram = { - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - - new HdrHistogram(1L, highest, significantDigits, scale) - } - - object HighestTrackableValue { - val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L - } - - case class Precision(significantDigits: Int) - object Precision { - val Low = Precision(1) - val Normal = Precision(2) - val Fine = Precision(3) - } + /** + * Scala API: + * + * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given + * [[kamon.metric.instrument.Histogram.DynamicRange]]. + */ + def apply(dynamicRange: DynamicRange): Histogram = new HdrHistogram(dynamicRange) + + /** + * Java API: + * + * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given + * [[kamon.metric.instrument.Histogram.DynamicRange]]. + */ + def create(dynamicRange: DynamicRange): Histogram = apply(dynamicRange) + + /** + * DynamicRange is a configuration object used to supply range and precision configuration to a + * [[kamon.metric.instrument.HdrHistogram]]. See the [[http://hdrhistogram.github.io/HdrHistogram/ HdrHistogram website]] + * for more details on how it works and the effects of these configuration values. + * + * @param lowestDiscernibleValue + * The lowest value that can be discerned (distinguished from 0) by the histogram.Must be a positive integer that + * is >= 1. May be internally rounded down to nearest power of 2. + * + * @param highestTrackableValue + * The highest value to be tracked by the histogram. Must be a positive integer that is >= (2 * lowestDiscernibleValue). + * Must not be larger than (Long.MAX_VALUE/2). + * + * @param precision + * The number of significant decimal digits to which the histogram will maintain value resolution and separation. + * Must be a non-negative integer between 1 and 3. + */ + case class DynamicRange(lowestDiscernibleValue: Long, highestTrackableValue: Long, precision: Int) trait Record { def level: Long @@ -67,29 +76,28 @@ object Histogram { var rawCompactRecord: Long = 0L } - trait Snapshot extends MetricSnapshot { - type SnapshotType = Histogram.Snapshot + trait Snapshot extends InstrumentSnapshot { def isEmpty: Boolean = numberOfMeasurements == 0 - def scale: Scale def numberOfMeasurements: Long def min: Long def max: Long def sum: Long def percentile(percentile: Double): Long def recordsIterator: Iterator[Record] + def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot } object Snapshot { - def empty(targetScale: Scale) = new Snapshot { + val empty = new Snapshot { override def min: Long = 0L override def max: Long = 0L override def sum: Long = 0L override def percentile(percentile: Double): Long = 0L override def recordsIterator: Iterator[Record] = Iterator.empty - override def merge(that: Snapshot, context: CollectionContext): Snapshot = that - override def scale: Scale = targetScale + override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = that + override def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = that override def numberOfMeasurements: Long = 0L } } @@ -100,10 +108,8 @@ object Histogram { * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. */ -class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit) - extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits) - with Histogram with AtomicHistogramFieldsAccessor { - +class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRange.lowestDiscernibleValue, + dynamicRange.highestTrackableValue, dynamicRange.precision) with Histogram with AtomicHistogramFieldsAccessor { import AtomicHistogramFieldsAccessor.totalCountUpdater def record(value: Long): Unit = recordValue(value) @@ -119,7 +125,7 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign val measurementsArray = Array.ofDim[Long](buffer.limit()) buffer.get(measurementsArray, 0, measurementsArray.length) - new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) + new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) } def getCounts = countsArray().length() @@ -160,7 +166,7 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign } -case class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, +case class CompactHdrSnapshot(val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot { def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0)) @@ -182,53 +188,61 @@ case class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, percentileLevel } - def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = { - if (that.isEmpty) this else if (this.isEmpty) that else { - import context.buffer - buffer.clear() + def merge(that: Histogram.Snapshot, context: CollectionContext): Snapshot = + merge(that.asInstanceOf[InstrumentSnapshot], context) - val selfIterator = recordsIterator - val thatIterator = that.recordsIterator - var thatCurrentRecord: Histogram.Record = null - var mergedNumberOfMeasurements = 0L + def merge(that: InstrumentSnapshot, context: CollectionContext): Histogram.Snapshot = that match { + case thatSnapshot: CompactHdrSnapshot ⇒ + if (thatSnapshot.isEmpty) this else if (this.isEmpty) thatSnapshot else { + import context.buffer + buffer.clear() - def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null - def addToBuffer(compactRecord: Long): Unit = { - mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) - buffer.put(compactRecord) - } + val selfIterator = recordsIterator + val thatIterator = thatSnapshot.recordsIterator + var thatCurrentRecord: Histogram.Record = null + var mergedNumberOfMeasurements = 0L - while (selfIterator.hasNext) { - val selfCurrentRecord = selfIterator.next() + def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null + def addToBuffer(compactRecord: Long): Unit = { + mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) + buffer.put(compactRecord) + } - // Advance that to no further than the level of selfCurrentRecord - thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord - while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { - addToBuffer(thatCurrentRecord.rawCompactRecord) - thatCurrentRecord = nextOrNull(thatIterator) + while (selfIterator.hasNext) { + val selfCurrentRecord = selfIterator.next() + + // Advance that to no further than the level of selfCurrentRecord + thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord + while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { + addToBuffer(thatCurrentRecord.rawCompactRecord) + thatCurrentRecord = nextOrNull(thatIterator) + } + + // Include the current record of self and optionally merge if has the same level as thatCurrentRecord + if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { + addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) + thatCurrentRecord = nextOrNull(thatIterator) + } else { + addToBuffer(selfCurrentRecord.rawCompactRecord) + } } - // Include the current record of self and optionally merge if has the same level as thatCurrentRecord - if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { - addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) - thatCurrentRecord = nextOrNull(thatIterator) - } else { - addToBuffer(selfCurrentRecord.rawCompactRecord) + // Include everything that might have been left from that + if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) + while (thatIterator.hasNext) { + addToBuffer(thatIterator.next().rawCompactRecord) } - } - // Include everything that might have been left from that - if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) - while (thatIterator.hasNext) { - addToBuffer(thatIterator.next().rawCompactRecord) + buffer.flip() + val compactRecords = Array.ofDim[Long](buffer.limit()) + buffer.get(compactRecords) + + new CompactHdrSnapshot(mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) } - buffer.flip() - val compactRecords = Array.ofDim[Long](buffer.limit()) - buffer.get(compactRecords) + case other ⇒ + sys.error(s"Cannot merge a CompactHdrSnapshot with the incompatible [${other.getClass.getName}] type.") - new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) - } } @inline private def mergeCompactRecords(left: Long, right: Long): Long = { diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala new file mode 100644 index 00000000..8cacc767 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala @@ -0,0 +1,56 @@ +package kamon.metric.instrument + +import java.nio.LongBuffer + +import akka.actor.{ Scheduler, Cancellable } +import akka.dispatch.MessageDispatcher +import scala.concurrent.duration.FiniteDuration + +private[kamon] trait Instrument { + type SnapshotType <: InstrumentSnapshot + + def collect(context: CollectionContext): SnapshotType + def cleanup: Unit +} + +trait InstrumentSnapshot { + def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot +} + +class InstrumentType private[kamon] (val id: Int) extends AnyVal +object InstrumentTypes { + val Histogram = new InstrumentType(1) + val MinMaxCounter = new InstrumentType(2) + val Gauge = new InstrumentType(3) + val Counter = new InstrumentType(4) +} + +trait CollectionContext { + def buffer: LongBuffer +} + +object CollectionContext { + def apply(longBufferSize: Int): CollectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(longBufferSize) + } +} + +trait RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable +} + +object RefreshScheduler { + val NoopScheduler = new RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = new Cancellable { + override def isCancelled: Boolean = true + override def cancel(): Boolean = true + } + } + + def apply(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = new RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = + scheduler.schedule(interval, interval)(refresh.apply())(dispatcher) + } + + def create(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = apply(scheduler, dispatcher) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala new file mode 100644 index 00000000..9b0c85cb --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala @@ -0,0 +1,35 @@ +package kamon.metric.instrument + +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange + +import scala.concurrent.duration.FiniteDuration + +case class InstrumentFactory(configurations: Map[String, InstrumentCustomSettings], defaults: DefaultInstrumentSettings, scheduler: RefreshScheduler) { + + private def resolveSettings(instrumentName: String, codeSettings: Option[InstrumentSettings], default: InstrumentSettings): InstrumentSettings = { + configurations.get(instrumentName).flatMap { customSettings ⇒ + codeSettings.map(cs ⇒ customSettings.combine(cs)) orElse (Some(customSettings.combine(default))) + + } getOrElse (codeSettings.getOrElse(default)) + } + + def createHistogram(name: String, dynamicRange: Option[DynamicRange] = None): Histogram = { + val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, None)), defaults.histogram) + Histogram(settings.dynamicRange) + } + + def createMinMaxCounter(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None): MinMaxCounter = { + val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.minMaxCounter) + MinMaxCounter(settings.dynamicRange, settings.refreshInterval.get, scheduler) + } + + def createGauge(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None, + valueCollector: CurrentValueCollector): Gauge = { + + val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.gauge) + Gauge(settings.dynamicRange, settings.refreshInterval.get, scheduler, valueCollector) + } + + def createCounter(): Counter = Counter() +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala new file mode 100644 index 00000000..1446a25d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala @@ -0,0 +1,67 @@ +package kamon.metric.instrument + +import java.util.concurrent.TimeUnit + +import com.typesafe.config.Config +import kamon.metric.instrument.Histogram.DynamicRange + +import scala.concurrent.duration.FiniteDuration + +case class InstrumentCustomSettings(lowestDiscernibleValue: Option[Long], highestTrackableValue: Option[Long], + precision: Option[Int], refreshInterval: Option[FiniteDuration]) { + + def combine(that: InstrumentSettings): InstrumentSettings = + InstrumentSettings( + DynamicRange( + lowestDiscernibleValue.getOrElse(that.dynamicRange.lowestDiscernibleValue), + highestTrackableValue.getOrElse(that.dynamicRange.highestTrackableValue), + precision.getOrElse(that.dynamicRange.precision)), + refreshInterval.orElse(that.refreshInterval)) +} + +object InstrumentCustomSettings { + import scala.concurrent.duration._ + + def fromConfig(config: Config): InstrumentCustomSettings = + InstrumentCustomSettings( + if (config.hasPath("lowest-discernible-value")) Some(config.getLong("lowest-discernible-value")) else None, + if (config.hasPath("highest-trackable-value")) Some(config.getLong("highest-trackable-value")) else None, + if (config.hasPath("precision")) Some(InstrumentSettings.parsePrecision(config.getString("precision"))) else None, + if (config.hasPath("refresh-interval")) Some(config.getDuration("refresh-interval", TimeUnit.NANOSECONDS).nanos) else None) + +} + +case class InstrumentSettings(dynamicRange: DynamicRange, refreshInterval: Option[FiniteDuration]) + +object InstrumentSettings { + + def readDynamicRange(config: Config): DynamicRange = + DynamicRange( + config.getLong("lowest-discernible-value"), + config.getLong("highest-trackable-value"), + parsePrecision(config.getString("precision"))) + + def parsePrecision(stringValue: String): Int = stringValue match { + case "low" ⇒ 1 + case "normal" ⇒ 2 + case "fine" ⇒ 3 + case other ⇒ sys.error(s"Invalid precision configuration [$other] found, valid options are: [low|normal|fine].") + } +} + +case class DefaultInstrumentSettings(histogram: InstrumentSettings, minMaxCounter: InstrumentSettings, gauge: InstrumentSettings) + +object DefaultInstrumentSettings { + + def fromConfig(config: Config): DefaultInstrumentSettings = { + import scala.concurrent.duration._ + + val histogramSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("histogram")), None) + val minMaxCounterSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("min-max-counter")), + Some(config.getDuration("min-max-counter.refresh-interval", TimeUnit.NANOSECONDS).nanos)) + val gaugeSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("gauge")), + Some(config.getDuration("gauge.refresh-interval", TimeUnit.NANOSECONDS).nanos)) + + DefaultInstrumentSettings(histogramSettings, minMaxCounterSettings, gaugeSettings) + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala index 4882d2aa..0828c8a9 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala @@ -17,16 +17,14 @@ package kamon.metric.instrument */ import java.lang.Math.abs -import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import akka.actor.{ ActorSystem, Cancellable } -import com.typesafe.config.Config +import akka.actor.Cancellable import kamon.jsr166.LongMaxUpdater -import kamon.metric.{ Scale, MetricRecorder, CollectionContext } +import kamon.metric.instrument.Histogram.DynamicRange import kamon.util.PaddedAtomicLong import scala.concurrent.duration.FiniteDuration -trait MinMaxCounter extends MetricRecorder { +trait MinMaxCounter extends Instrument { override type SnapshotType = Histogram.Snapshot def increment(): Unit @@ -38,29 +36,20 @@ trait MinMaxCounter extends MetricRecorder { object MinMaxCounter { - def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration, - system: ActorSystem): MinMaxCounter = { - - val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) + def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter = { + val underlyingHistogram = Histogram(dynamicRange) val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram) - - val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ { minMaxCounter.refreshValues() - }(system.dispatcher) // TODO: Move this to Kamon dispatchers + }) minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule) minMaxCounter } - def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = { - import scala.concurrent.duration._ + def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter = + apply(dynamicRange, refreshInterval, scheduler) - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) - - apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system) - } } class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter { diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala new file mode 100644 index 00000000..cf6b8b4c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala @@ -0,0 +1,55 @@ +package kamon.metric.instrument + +trait UnitOfMeasurement { + def name: String + def label: String + def factor: Double +} + +object UnitOfMeasurement { + case object Unknown extends UnitOfMeasurement { + val name = "unknown" + val label = "unknown" + val factor = 1D + } + + def isUnknown(uom: UnitOfMeasurement): Boolean = + uom == Unknown + + def isTime(uom: UnitOfMeasurement): Boolean = + uom.isInstanceOf[Time] + +} + +case class Time(factor: Double, label: String) extends UnitOfMeasurement { + val name = "time" + + /** + * Scale a value from this scale factor to a different scale factor. + * + * @param toUnit Time unit of the expected result. + * @param value Value to scale. + * @return Equivalent of value on the target time unit. + */ + def scale(toUnit: Time)(value: Long): Double = + (value * factor) / toUnit.factor +} + +object Time { + val Nanoseconds = Time(1E-9, "n") + val Microseconds = Time(1E-6, "µs") + val Milliseconds = Time(1E-3, "ms") + val Seconds = Time(1, "s") +} + +case class Memory(factor: Double, label: String) extends UnitOfMeasurement { + val name = "bytes" +} + +object Memory { + val Bytes = Memory(1, "b") + val KiloBytes = Memory(1024, "Kb") + val MegaBytes = Memory(1024E2, "Mb") + val GigaBytes = Memory(1024E3, "Gb") +} + diff --git a/kamon-core/src/main/scala/kamon/metric/package.scala b/kamon-core/src/main/scala/kamon/metric/package.scala deleted file mode 100644 index 43166058..00000000 --- a/kamon-core/src/main/scala/kamon/metric/package.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon - -import scala.annotation.tailrec -import com.typesafe.config.Config - -package object metric { - - @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { - if (right.isEmpty) - left - else { - val (key, rightValue) = right.head - val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue) - - combineMaps(left.updated(key, value), right.tail)(valueMerger) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala b/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala deleted file mode 100644 index 490bc127..00000000 --- a/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala +++ /dev/null @@ -1,61 +0,0 @@ -package kamon.standalone - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.Kamon -import kamon.metric.UserMetrics -import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram } - -import scala.concurrent.duration.FiniteDuration - -trait KamonStandalone { - private[kamon] def system: ActorSystem - - def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = - Kamon(UserMetrics)(system).registerHistogram(name, precision, highestTrackableValue) - - def registerHistogram(name: String): Histogram = - Kamon(UserMetrics)(system).registerHistogram(name) - - def registerCounter(name: String): Counter = - Kamon(UserMetrics)(system).registerCounter(name) - - def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, - refreshInterval: FiniteDuration): MinMaxCounter = - Kamon(UserMetrics)(system).registerMinMaxCounter(name, precision, highestTrackableValue, refreshInterval) - - def registerMinMaxCounter(name: String): MinMaxCounter = - Kamon(UserMetrics)(system).registerMinMaxCounter(name) - - def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = - Kamon(UserMetrics)(system).registerGauge(name)(currentValueCollector) - - def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, - refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = - Kamon(UserMetrics)(system).registerGauge(name, precision, highestTrackableValue, refreshInterval)(currentValueCollector) - - def removeHistogram(name: String): Unit = - Kamon(UserMetrics)(system).removeHistogram(name) - - def removeCounter(name: String): Unit = - Kamon(UserMetrics)(system).removeCounter(name) - - def removeMinMaxCounter(name: String): Unit = - Kamon(UserMetrics)(system).removeMinMaxCounter(name) - - def removeGauge(name: String): Unit = - Kamon(UserMetrics)(system).removeGauge(name) -} - -object KamonStandalone { - - def buildFromConfig(config: Config): KamonStandalone = buildFromConfig(config, "kamon-standalone") - - def buildFromConfig(config: Config, actorSystemName: String): KamonStandalone = new KamonStandalone { - val system: ActorSystem = ActorSystem(actorSystemName, config) - } -} - -object EmbeddedKamonStandalone extends KamonStandalone { - private[kamon] lazy val system = ActorSystem("kamon-standalone") -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala index c39a9984..3b2a3bf9 100644 --- a/kamon-core/src/main/scala/kamon/trace/Incubator.scala +++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala @@ -19,8 +19,8 @@ package kamon.trace import java.util.concurrent.TimeUnit import akka.actor.{ ActorLogging, Props, Actor, ActorRef } -import kamon.{ NanoInterval, RelativeNanoTimestamp } import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace } +import kamon.util.{ NanoInterval, RelativeNanoTimestamp } import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.concurrent.duration._ diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala index 66c6633d..e62178dd 100644 --- a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala @@ -18,16 +18,16 @@ package kamon.trace import java.util.concurrent.ConcurrentLinkedQueue -import akka.actor.ActorSystem +import akka.actor.{ ExtensionId, ActorSystem } import akka.event.LoggingAdapter -import kamon.{ RelativeNanoTimestamp, NanoInterval } -import kamon.metric.TraceMetrics.TraceMetricRecorder +import kamon.Kamon.Extension import kamon.metric.{ MetricsExtension, TraceMetrics } +import kamon.util.{ NanoInterval, RelativeNanoTimestamp } import scala.annotation.tailrec -private[trace] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, val origin: TraceContextOrigin, - val startRelativeTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, val system: ActorSystem) +private[kamon] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, + val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, val actorSystem: ActorSystem) extends TraceContext { @volatile private var _name = traceName @@ -48,35 +48,36 @@ private[trace] class MetricsOnlyContext(traceName: String, val token: String, iz def isOpen: Boolean = _isOpen def addMetadata(key: String, value: String): Unit = {} + def lookupExtension[T <: Extension](id: ExtensionId[T]): T = id(actorSystem) + def finish(): Unit = { _isOpen = false - val traceElapsedTime = NanoInterval.since(startRelativeTimestamp) + val traceElapsedTime = NanoInterval.since(startTimestamp) _elapsedTime = traceElapsedTime - val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) - metricRecorder.map { traceMetrics ⇒ - traceMetrics.elapsedTime.record(traceElapsedTime.nanos) - drainFinishedSegments(traceMetrics) + metricsExtension.register(TraceMetrics, name).map { registration ⇒ + registration.recorder.ElapsedTime.record(traceElapsedTime.nanos) + drainFinishedSegments(registration.recorder) } } def startSegment(segmentName: String, category: String, library: String): Segment = new MetricsOnlySegment(segmentName, category, library) - @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { + @tailrec private def drainFinishedSegments(recorder: TraceMetrics): Unit = { val segment = _finishedSegments.poll() if (segment != null) { - metricRecorder.segmentRecorder(segment.identity).record(segment.duration.nanos) - drainFinishedSegments(metricRecorder) + recorder.segment(segment.name, segment.category, segment.library).record(segment.duration.nanos) + drainFinishedSegments(recorder) } } protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { - _finishedSegments.add(SegmentLatencyData(SegmentMetricIdentity(segmentName, category, library), duration)) + _finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration)) if (isClosed) { - metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ - drainFinishedSegments(traceMetrics) + metricsExtension.register(TraceMetrics, name).map { registration ⇒ + drainFinishedSegments(registration.recorder) } } } @@ -118,4 +119,6 @@ private[trace] class MetricsOnlyContext(traceName: String, val token: String, iz def elapsedTime: NanoInterval = _elapsedTime def startTimestamp: RelativeNanoTimestamp = _startTimestamp } -} \ No newline at end of file +} + +case class SegmentLatencyData(name: String, category: String, library: String, duration: NanoInterval) diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala index 2308d1d0..5abba221 100644 --- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala +++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala @@ -16,8 +16,7 @@ package kamon.trace -import kamon.NanoInterval -import kamon.util.Sequencer +import kamon.util.{ NanoInterval, Sequencer } import scala.concurrent.forkjoin.ThreadLocalRandom trait Sampler { diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 60244eaa..ed8170a9 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -17,26 +17,60 @@ package kamon.trace import java.io.ObjectStreamException -import akka.actor.ActorSystem +import akka.actor.{ ExtensionId, ActorSystem } +import kamon.Kamon.Extension import kamon._ import kamon.metric._ import kamon.trace.TraceContextAware.DefaultTraceContextAware +import kamon.util.{ NanoInterval, RelativeNanoTimestamp } trait TraceContext { def name: String def token: String - def origin: TraceContextOrigin def isEmpty: Boolean def nonEmpty: Boolean = !isEmpty def isOpen: Boolean def isClosed: Boolean = !isOpen - def system: ActorSystem def finish(): Unit def rename(newName: String): Unit + def startSegment(segmentName: String, category: String, library: String): Segment def addMetadata(key: String, value: String) - def startRelativeTimestamp: RelativeNanoTimestamp + + def startTimestamp: RelativeNanoTimestamp + + def lookupExtension[T <: Kamon.Extension](id: ExtensionId[T]): T +} + +object TraceContext { + private[kamon] val _traceContextStorage = new ThreadLocal[TraceContext] { + override def initialValue(): TraceContext = EmptyTraceContext + } + + def currentContext: TraceContext = + _traceContextStorage.get() + + def setCurrentContext(context: TraceContext): Unit = + _traceContextStorage.set(context) + + def clearCurrentContext: Unit = + _traceContextStorage.remove() + + def withContext[T](context: TraceContext)(code: ⇒ T): T = { + val oldContext = _traceContextStorage.get() + _traceContextStorage.set(context) + + try code finally _traceContextStorage.set(oldContext) + } + + def map[T](f: TraceContext ⇒ T): Option[T] = { + val current = currentContext + if (current.nonEmpty) + Some(f(current)) + else None + } + } trait Segment { @@ -56,16 +90,17 @@ trait Segment { case object EmptyTraceContext extends TraceContext { def name: String = "empty-trace" def token: String = "" - def origin: TraceContextOrigin = TraceContextOrigin.Local def isEmpty: Boolean = true def isOpen: Boolean = false - def system: ActorSystem = sys.error("Can't obtain a ActorSystem from a EmptyTraceContext.") def finish(): Unit = {} def rename(name: String): Unit = {} def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment def addMetadata(key: String, value: String): Unit = {} - def startRelativeTimestamp = new RelativeNanoTimestamp(0L) + def startTimestamp = new RelativeNanoTimestamp(0L) + + override def lookupExtension[T <: Extension](id: ExtensionId[T]): T = + sys.error("Can't lookup extensions on a EmptyTraceContext.") case object EmptySegment extends Segment { val name: String = "empty-segment" @@ -80,14 +115,17 @@ case object EmptyTraceContext extends TraceContext { } } -case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity -case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: NanoInterval) - object SegmentCategory { val HttpClient = "http-client" val Database = "database" } +class LOD private[trace] (val level: Int) extends AnyVal +object LOD { + val MetricsOnly = new LOD(1) + val SimpleTrace = new LOD(2) +} + sealed trait LevelOfDetail object LevelOfDetail { case object MetricsOnly extends LevelOfDetail @@ -95,12 +133,6 @@ object LevelOfDetail { case object FullTrace extends LevelOfDetail } -sealed trait TraceContextOrigin -object TraceContextOrigin { - case object Local extends TraceContextOrigin - case object Remote extends TraceContextOrigin -} - trait TraceContextAware extends Serializable { def traceContext: TraceContext } @@ -109,7 +141,7 @@ object TraceContextAware { def default: TraceContextAware = new DefaultTraceContextAware class DefaultTraceContextAware extends TraceContextAware { - @transient val traceContext = TraceRecorder.currentContext + @transient val traceContext = TraceContext.currentContext // // Beware of this hack, it might bite us in the future! diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala deleted file mode 100644 index 41f022d0..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import java.util.concurrent.TimeUnit - -import akka.actor._ -import akka.actor -import akka.event.Logging -import kamon._ -import kamon.metric.Metrics -import kamon.util.GlobPathFilter - -class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val config = system.settings.config.getConfig("kamon.trace") - val dispatcher = system.dispatchers.lookup(config.getString("dispatcher")) - - val detailLevel: LevelOfDetail = config.getString("level") match { - case "metrics-only" ⇒ LevelOfDetail.MetricsOnly - case "simple-trace" ⇒ LevelOfDetail.SimpleTrace - case other ⇒ sys.error(s"Unknown tracing level $other present in the configuration file.") - } - - val sampler: Sampler = - if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling - else config.getString("sampling") match { - case "all" ⇒ SampleAll - case "random" ⇒ new RandomSampler(config.getInt("random-sampler.chance")) - case "ordered" ⇒ new OrderedSampler(config.getInt("ordered-sampler.interval")) - case "threshold" ⇒ new ThresholdSampler(config.getDuration("threshold-sampler.minimum-elapsed-time", TimeUnit.NANOSECONDS)) - } - - val log = Logging(system, "TraceExtension") - val subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") - val incubator = system.actorOf(Incubator.props(subscriptions)) - val metricsExtension = Kamon(Metrics)(system) - - def newTraceContext(traceName: String, token: String, origin: TraceContextOrigin, system: ActorSystem): TraceContext = - newTraceContext(traceName, token, true, origin, RelativeNanoTimestamp.now, system) - - def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin, - startTimestamp: RelativeNanoTimestamp, system: ActorSystem): TraceContext = { - def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, detailLevel, origin, startTimestamp, log, metricsExtension, system) - - if (detailLevel == LevelOfDetail.MetricsOnly || origin == TraceContextOrigin.Remote) - newMetricsOnlyContext - else { - if (!sampler.shouldTrace) - newMetricsOnlyContext - else - new TracingContext(traceName, token, true, detailLevel, origin, startTimestamp, log, metricsExtension, this, system) - } - } - - def subscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Subscribe(subscriber) - def unsubscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Unsubscribe(subscriber) - - private[kamon] def dispatchTracingContext(trace: TracingContext): Unit = - if (sampler.shouldReport(trace.elapsedTime)) - if (trace.shouldIncubate) - incubator ! trace - else - subscriptions ! trace.generateTraceInfo - -} - -object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Trace - def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system) - - case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { - def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) - } -} - -case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo]) -case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String]) \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala index 84e234f3..057f564e 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala @@ -42,12 +42,12 @@ object TraceLocal { object HttpContextKey extends TraceLocal.TraceLocalKey { type ValueType = HttpContext } - def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceRecorder.currentContext match { + def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceContext.currentContext match { case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value) case EmptyTraceContext ⇒ // Can't store in the empty context. } - def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceRecorder.currentContext match { + def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceContext.currentContext match { case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key) case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala deleted file mode 100644 index 703896c3..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import kamon.{ MilliTimestamp, RelativeNanoTimestamp, Kamon } - -import scala.language.experimental.macros -import java.util.concurrent.atomic.AtomicLong -import kamon.macros.InlineTraceContextMacro - -import scala.util.Try -import java.net.InetAddress -import akka.actor.ActorSystem - -object TraceRecorder { - private val traceContextStorage = new ThreadLocal[TraceContext] { - override def initialValue(): TraceContext = EmptyTraceContext - } - - private val tokenCounter = new AtomicLong - private val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") - - def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet()) - - private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = - Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), TraceContextOrigin.Local, system) - - def joinRemoteTraceContext(traceName: String, traceToken: String, startTimestamp: MilliTimestamp, isOpen: Boolean, system: ActorSystem): TraceContext = { - val equivalentStartTimestamp = RelativeNanoTimestamp.relativeTo(startTimestamp) - Kamon(Trace)(system).newTraceContext(traceName, traceToken, isOpen, TraceContextOrigin.Remote, equivalentStartTimestamp, system) - } - - def setContext(context: TraceContext): Unit = traceContextStorage.set(context) - - def clearContext: Unit = traceContextStorage.set(EmptyTraceContext) - - def currentContext: TraceContext = traceContextStorage.get() - - def start(name: String, token: Option[String] = None)(implicit system: ActorSystem) = { - val ctx = newTraceContext(name, token, system) - traceContextStorage.set(ctx) - } - - def rename(name: String): Unit = currentContext.rename(name) - - def withNewTraceContext[T](name: String, token: Option[String] = None)(thunk: ⇒ T)(implicit system: ActorSystem): T = - withTraceContext(newTraceContext(name, token, system))(thunk) - - def withTraceContext[T](context: TraceContext)(thunk: ⇒ T): T = { - val oldContext = currentContext - setContext(context) - - try thunk finally setContext(oldContext) - } - - def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match { - case ctx: MetricsOnlyContext ⇒ Some(thunk(ctx, ctx.system)) - case EmptyTraceContext ⇒ None - } - - def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext] - - def finish(): Unit = currentContext.finish() - -} diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala new file mode 100644 index 00000000..41dcd6bc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala @@ -0,0 +1,94 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.net.InetAddress +import java.util.concurrent.atomic.AtomicLong + +import akka.actor._ +import akka.actor +import kamon.Kamon +import kamon.metric.{ Metrics, MetricsExtension } +import kamon.util.{ NanoInterval, RelativeNanoTimestamp, NanoTimestamp, GlobPathFilter } + +import scala.util.Try + +object Tracer extends ExtensionId[TracerExtension] with ExtensionIdProvider { + override def get(system: ActorSystem): TracerExtension = super.get(system) + def lookup(): ExtensionId[_ <: actor.Extension] = Tracer + def createExtension(system: ExtendedActorSystem): TracerExtension = new TracerExtensionImpl(system) +} + +trait TracerExtension extends Kamon.Extension { + def newContext(name: String): TraceContext + def newContext(name: String, token: String): TraceContext + def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext + + def subscribe(subscriber: ActorRef): Unit + def unsubscribe(subscriber: ActorRef): Unit +} + +class TracerExtensionImpl(system: ExtendedActorSystem) extends TracerExtension { + private val _settings = TraceSettings(system) + private val _metricsExtension = Metrics.get(system) + + private val _hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") + private val _tokenCounter = new AtomicLong + private val _subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") + private val _incubator = system.actorOf(Incubator.props(_subscriptions)) + + private def newToken: String = + _hostnamePrefix + "-" + String.valueOf(_tokenCounter.incrementAndGet()) + + def newContext(name: String): TraceContext = + createTraceContext(name) + + def newContext(name: String, token: String): TraceContext = + createTraceContext(name, token) + + def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext = + createTraceContext(name, token, timestamp, isOpen, isLocal) + + private def createTraceContext(traceName: String, token: String = newToken, startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now, + isOpen: Boolean = true, isLocal: Boolean = true): TraceContext = { + + def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, _settings.levelOfDetail, startTimestamp, null, _metricsExtension, system) + + if (_settings.levelOfDetail == LevelOfDetail.MetricsOnly || !isLocal) + newMetricsOnlyContext + else { + if (!_settings.sampler.shouldTrace) + newMetricsOnlyContext + else + new TracingContext(traceName, token, true, _settings.levelOfDetail, isLocal, startTimestamp, null, _metricsExtension, this, system, dispatchTracingContext) + } + } + + def subscribe(subscriber: ActorRef): Unit = _subscriptions ! TraceSubscriptions.Subscribe(subscriber) + def unsubscribe(subscriber: ActorRef): Unit = _subscriptions ! TraceSubscriptions.Unsubscribe(subscriber) + + private[kamon] def dispatchTracingContext(trace: TracingContext): Unit = + if (_settings.sampler.shouldReport(trace.elapsedTime)) + if (trace.shouldIncubate) + _incubator ! trace + else + _subscriptions ! trace.generateTraceInfo + +} + +case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo]) +case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String]) \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala new file mode 100644 index 00000000..e6c2d3ef --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala @@ -0,0 +1,30 @@ +package kamon.trace + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem + +case class TraceSettings(levelOfDetail: LevelOfDetail, sampler: Sampler) + +object TraceSettings { + def apply(system: ActorSystem): TraceSettings = { + val tracerConfig = system.settings.config.getConfig("kamon.trace") + + val detailLevel: LevelOfDetail = tracerConfig.getString("level-of-detail") match { + case "metrics-only" ⇒ LevelOfDetail.MetricsOnly + case "simple-trace" ⇒ LevelOfDetail.SimpleTrace + case other ⇒ sys.error(s"Unknown tracer level of detail [$other] present in the configuration file.") + } + + val sampler: Sampler = + if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling + else tracerConfig.getString("sampling") match { + case "all" ⇒ SampleAll + case "random" ⇒ new RandomSampler(tracerConfig.getInt("random-sampler.chance")) + case "ordered" ⇒ new OrderedSampler(tracerConfig.getInt("ordered-sampler.interval")) + case "threshold" ⇒ new ThresholdSampler(tracerConfig.getDuration("threshold-sampler.minimum-elapsed-time", TimeUnit.NANOSECONDS)) + } + + TraceSettings(detailLevel, sampler) + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala index 31ab282d..dd4c3c1a 100644 --- a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala @@ -21,14 +21,15 @@ import java.util.concurrent.atomic.AtomicInteger import akka.actor.ActorSystem import akka.event.LoggingAdapter -import kamon.{ NanoInterval, NanoTimestamp, RelativeNanoTimestamp } +import kamon.util.{ NanoInterval, RelativeNanoTimestamp, NanoTimestamp } import kamon.metric.MetricsExtension import scala.collection.concurrent.TrieMap -private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin, - startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, traceExtension: TraceExtension, system: ActorSystem) - extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, startTimeztamp, log, metricsExtension, system) { +private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, + isLocal: Boolean, startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, + traceExtension: TracerExtensionImpl, system: ActorSystem, traceInfoSink: TracingContext ⇒ Unit) + extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, startTimeztamp, log, metricsExtension, system) { private val _openSegments = new AtomicInteger(0) private val _startTimestamp = NanoTimestamp.now @@ -46,7 +47,7 @@ private[trace] class TracingContext(traceName: String, token: String, izOpen: Bo override def finish(): Unit = { super.finish() - traceExtension.dispatchTracingContext(this) + traceInfoSink(this) } override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { @@ -66,7 +67,7 @@ private[trace] class TracingContext(traceName: String, token: String, izOpen: Bo while (currentSegments.hasNext()) { val segment = currentSegments.next() if (segment.isClosed) - segmentsInfo += segment.createSegmentInfo(_startTimestamp, startRelativeTimestamp) + segmentsInfo += segment.createSegmentInfo(_startTimestamp, startTimestamp) else log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name) } diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala index f052f009..961c3099 100644 --- a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala @@ -17,11 +17,11 @@ package kamon.trace.logging import ch.qos.logback.classic.pattern.ClassicConverter import ch.qos.logback.classic.spi.ILoggingEvent -import kamon.trace.TraceRecorder +import kamon.trace.TraceContext class LogbackTraceTokenConverter extends ClassicConverter { def convert(event: ILoggingEvent): String = { - val ctx = TraceRecorder.currentContext + val ctx = TraceContext.currentContext if (ctx.isEmpty) "undefined" else diff --git a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala index 4f4efa4d..4970d97e 100644 --- a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala @@ -17,14 +17,14 @@ package kamon.trace.logging import kamon.trace.TraceLocal.AvailableToMdc -import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext, TraceRecorder } +import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext } import org.slf4j.MDC trait MdcKeysSupport { def withMdc[A](thunk: ⇒ A): A = { - val keys = copyToMdc(TraceRecorder.currentContext) + val keys = copyToMdc(TraceContext.currentContext) try thunk finally keys.foreach(key ⇒ MDC.remove(key)) } diff --git a/kamon-core/src/main/scala/kamon/util/ConfigTools.scala b/kamon-core/src/main/scala/kamon/util/ConfigTools.scala new file mode 100644 index 00000000..9810428e --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/ConfigTools.scala @@ -0,0 +1,26 @@ +package kamon.util + +import java.util.concurrent.TimeUnit + +import com.typesafe.config.Config + +import scala.concurrent.duration.FiniteDuration + +object ConfigTools { + implicit class Syntax(val config: Config) extends AnyVal { + // We are using the deprecated .getNanoseconds option to keep Kamon source code compatible with + // versions of Akka using older typesafe-config versions. + + def getFiniteDuration(path: String): FiniteDuration = + FiniteDuration(config.getNanoseconds(path), TimeUnit.NANOSECONDS) + + def firstLevelKeys: Set[String] = { + import scala.collection.JavaConverters._ + + config.entrySet().asScala.map { + case entry ⇒ entry.getKey.takeWhile(_ != '.') + } toSet + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/util/FastDispatch.scala b/kamon-core/src/main/scala/kamon/util/FastDispatch.scala new file mode 100644 index 00000000..8f23164a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/FastDispatch.scala @@ -0,0 +1,22 @@ +package kamon.util + +import akka.actor.ActorRef + +import scala.concurrent.{ ExecutionContext, Future } + +/** + * Extension for Future[ActorRef]. Try to dispatch a message to a Future[ActorRef] in the same thread if it has already + * completed or do the regular scheduling otherwise. Specially useful when using the ModuleSupervisor extension to + * create actors. + */ +object FastDispatch { + implicit class Syntax(val target: Future[ActorRef]) extends AnyVal { + + def fastDispatch(message: Any)(implicit ec: ExecutionContext): Unit = + if (target.isCompleted) + target.value.get.map(_ ! message) + else + target.map(_ ! message) + } + +} diff --git a/kamon-core/src/main/scala/kamon/util/MapMerge.scala b/kamon-core/src/main/scala/kamon/util/MapMerge.scala new file mode 100644 index 00000000..b7f18788 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/MapMerge.scala @@ -0,0 +1,27 @@ +package kamon.util + +object MapMerge { + + /** + * Merge to immutable maps with the same key and value types, using the provided valueMerge function. + */ + implicit class Syntax[K, V](val map: Map[K, V]) extends AnyVal { + def merge(that: Map[K, V], valueMerge: (V, V) ⇒ V): Map[K, V] = { + val merged = Map.newBuilder[K, V] + + map.foreach { + case (key, value) ⇒ + val mergedValue = that.get(key).map(v ⇒ valueMerge(value, v)).getOrElse(value) + merged += key -> mergedValue + } + + that.foreach { + case kv @ (key, _) if !map.contains(key) ⇒ merged += kv + case other ⇒ // ignore, already included. + } + + merged.result(); + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/util/Timestamp.scala b/kamon-core/src/main/scala/kamon/util/Timestamp.scala new file mode 100644 index 00000000..4ff031a6 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/Timestamp.scala @@ -0,0 +1,85 @@ +package kamon.util + +/** + * Epoch time stamp. + */ +class Timestamp(val seconds: Long) extends AnyVal { + def <(that: Timestamp): Boolean = this.seconds < that.seconds + def >(that: Timestamp): Boolean = this.seconds > that.seconds + def ==(that: Timestamp): Boolean = this.seconds == that.seconds + def >=(that: Timestamp): Boolean = this.seconds >= that.seconds + def <=(that: Timestamp): Boolean = this.seconds <= that.seconds + + override def toString: String = String.valueOf(seconds) + ".seconds" +} + +object Timestamp { + def now: Timestamp = new Timestamp(System.currentTimeMillis() / 1000) + def earlier(l: Timestamp, r: Timestamp): Timestamp = if (l <= r) l else r + def later(l: Timestamp, r: Timestamp): Timestamp = if (l >= r) l else r +} + +/** + * Epoch time stamp in milliseconds. + */ +class MilliTimestamp(val millis: Long) extends AnyVal { + override def toString: String = String.valueOf(millis) + ".millis" + + def toTimestamp: Timestamp = new Timestamp(millis / 1000) + def toRelativeNanoTimestamp: RelativeNanoTimestamp = { + val diff = (System.currentTimeMillis() - millis) * 1000000 + new RelativeNanoTimestamp(System.nanoTime() - diff) + } +} + +object MilliTimestamp { + def now: MilliTimestamp = new MilliTimestamp(System.currentTimeMillis()) +} + +/** + * Epoch time stamp in nanoseconds. + * + * NOTE: This doesn't have any better precision than MilliTimestamp, it is just a convenient way to get a epoch + * timestamp in nanoseconds. + */ +class NanoTimestamp(val nanos: Long) extends AnyVal { + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object NanoTimestamp { + def now: NanoTimestamp = new NanoTimestamp(System.currentTimeMillis() * 1000000) +} + +/** + * Number of nanoseconds between a arbitrary origin timestamp provided by the JVM via System.nanoTime() + */ +class RelativeNanoTimestamp(val nanos: Long) extends AnyVal { + override def toString: String = String.valueOf(nanos) + ".nanos" + + def toMilliTimestamp: MilliTimestamp = + new MilliTimestamp(System.currentTimeMillis - ((System.nanoTime - nanos) / 1000000)) +} + +object RelativeNanoTimestamp { + def now: RelativeNanoTimestamp = new RelativeNanoTimestamp(System.nanoTime()) + def relativeTo(milliTimestamp: MilliTimestamp): RelativeNanoTimestamp = + new RelativeNanoTimestamp(now.nanos - (MilliTimestamp.now.millis - milliTimestamp.millis) * 1000000) +} + +/** + * Number of nanoseconds that passed between two points in time. + */ +class NanoInterval(val nanos: Long) extends AnyVal { + def <(that: NanoInterval): Boolean = this.nanos < that.nanos + def >(that: NanoInterval): Boolean = this.nanos > that.nanos + def ==(that: NanoInterval): Boolean = this.nanos == that.nanos + def >=(that: NanoInterval): Boolean = this.nanos >= that.nanos + def <=(that: NanoInterval): Boolean = this.nanos <= that.nanos + + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object NanoInterval { + def default: NanoInterval = new NanoInterval(0L) + def since(relative: RelativeNanoTimestamp): NanoInterval = new NanoInterval(System.nanoTime() - relative.nanos) +} diff --git a/kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala b/kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala new file mode 100644 index 00000000..cd457cdc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala @@ -0,0 +1,18 @@ +package kamon.util + +import scala.collection.concurrent.TrieMap + +object TriemapAtomicGetOrElseUpdate { + + /** + * Workaround to the non thread-safe [[scala.collection.concurrent.TrieMap#getOrElseUpdate]] method. More details on + * why this is necessary can be found at [[https://issues.scala-lang.org/browse/SI-7943]]. + */ + implicit class Syntax[K, V](val trieMap: TrieMap[K, V]) extends AnyVal { + def atomicGetOrElseUpdate(key: K, op: ⇒ V): V = + trieMap.get(key) match { + case Some(v) ⇒ v + case None ⇒ val d = op; trieMap.putIfAbsent(key, d).getOrElse(d) + } + } +} diff --git a/kamon-core/src/test/resources/logback.xml b/kamon-core/src/test/resources/logback.xml index eb578346..dd623d61 100644 --- a/kamon-core/src/test/resources/logback.xml +++ b/kamon-core/src/test/resources/logback.xml @@ -1,17 +1,17 @@ + + true + - - true - + - - - %date{HH:mm:ss.SSS} %-5level [%X{uow}][%X{requestId}] [%thread] %logger{55} - %msg%n - - - - - - + + + %date{HH:mm:ss.SSS} %-5level [%traceToken][%thread] %logger{55} - %msg%n + + + + + diff --git a/kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala deleted file mode 100644 index 31afd3ff..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation.scala - -import akka.actor.ActorSystem -import akka.testkit.TestKit -import kamon.trace.TraceRecorder -import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } -import org.scalatest.{ Matchers, OptionValues, WordSpecLike } - -import scala.concurrent.Future - -class FutureInstrumentationSpec extends TestKit(ActorSystem("future-instrumentation-spec")) with WordSpecLike with Matchers - with ScalaFutures with PatienceConfiguration with OptionValues { - - implicit val execContext = system.dispatcher - - "a Future created with FutureTracing" should { - "capture the TraceContext available when created" which { - "must be available when executing the future's body" in { - - val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { - val future = Future(TraceRecorder.currentContext) - - (future, TraceRecorder.currentContext) - } - - whenReady(future)(ctxInFuture ⇒ - ctxInFuture should equal(testTraceContext)) - } - - "must be available when executing callbacks on the future" in { - - val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { - val future = Future("Hello Kamon!") - // The TraceContext is expected to be available during all intermediate processing. - .map(_.length) - .flatMap(len ⇒ Future(len.toString)) - .map(s ⇒ TraceRecorder.currentContext) - - (future, TraceRecorder.currentContext) - } - - whenReady(future)(ctxInFuture ⇒ - ctxInFuture should equal(testTraceContext)) - } - } - } -} - diff --git a/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala deleted file mode 100644 index 29bf96f8..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation.scalaz - -import akka.actor.ActorSystem -import akka.testkit.TestKit -import kamon.trace.TraceRecorder -import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } -import org.scalatest.{ Matchers, OptionValues, WordSpecLike } -import scalaz.concurrent.Future -import java.util.concurrent.Executors - -class FutureInstrumentationSpec extends TestKit(ActorSystem("future-instrumentation-spec")) with WordSpecLike with Matchers - with ScalaFutures with PatienceConfiguration with OptionValues { - - implicit val execContext = Executors.newCachedThreadPool() - - "a Future created with FutureTracing" should { - "capture the TraceContext available when created" which { - "must be available when executing the future's body" in { - - val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { - val future = Future(TraceRecorder.currentContext).start - - (future, TraceRecorder.currentContext) - } - - val ctxInFuture = future.run - ctxInFuture should equal(testTraceContext) - } - - "must be available when executing callbacks on the future" in { - - val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") { - val future = Future("Hello Kamon!") - // The TraceContext is expected to be available during all intermediate processing. - .map(_.length) - .flatMap(len ⇒ Future(len.toString)) - .map(s ⇒ TraceRecorder.currentContext) - - (future.start, TraceRecorder.currentContext) - } - - val ctxInFuture = future.run - ctxInFuture should equal(testTraceContext) - } - } - } -} - diff --git a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala index 9144725e..40200685 100644 --- a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala @@ -1,128 +1,110 @@ package kamon.metric import akka.actor._ -import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase } +import akka.testkit.{ TestProbe, ImplicitSender } import com.typesafe.config.ConfigFactory -import kamon.Kamon -import kamon.metric.Subscriptions.TickMetricSnapshot -import org.scalatest.{ Matchers, WordSpecLike } +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.testkit.BaseKamonSpec import scala.concurrent.duration._ -class SubscriptionsProtocolSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("subscriptions-protocol-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 hour - |} - """.stripMargin)) +class SubscriptionsProtocolSpec extends BaseKamonSpec("subscriptions-protocol-spec") with ImplicitSender { + override lazy val config = + ConfigFactory.parseString( + """ + |kamon.metric { + | tick-interval = 1 hour + |} + """.stripMargin) - val metricsExtension = Kamon(Metrics)(system) - import metricsExtension.{ register, subscribe, unsubscribe } + val metricsModule = kamon.metrics + import metricsModule.{ register, subscribe, unsubscribe } "the Subscriptions messaging protocol" should { "allow subscribing for a single tick" in { val subscriber = TestProbe() - register(TraceMetrics("one-shot"), TraceMetrics.Factory) - subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = false) + register(TraceMetrics, "one-shot") + subscribe("trace", "one-shot", subscriber.ref, permanently = false) - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(1) - tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) + tickSnapshot.metrics.keys should contain(Entity("one-shot", "trace")) - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() subscriber.expectNoMsg(1 second) } "allow subscribing permanently to a metric" in { val subscriber = TestProbe() - register(TraceMetrics("permanent"), TraceMetrics.Factory) - subscribe(TraceMetrics, "permanent", subscriber.ref, permanently = true) + register(TraceMetrics, "permanent") + subscribe("trace", "permanent", subscriber.ref, permanently = true) for (repetition ← 1 to 5) { - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(1) - tickSnapshot.metrics.keys should contain(TraceMetrics("permanent")) - subscriber.expectNoMsg(1 second) + tickSnapshot.metrics.keys should contain(Entity("permanent", "trace")) } } "allow subscribing to metrics matching a glob pattern" in { val subscriber = TestProbe() - register(TraceMetrics("include-one"), TraceMetrics.Factory) - register(TraceMetrics("exclude-two"), TraceMetrics.Factory) - register(TraceMetrics("include-three"), TraceMetrics.Factory) - subscribe(TraceMetrics, "include-*", subscriber.ref, permanently = true) + register(TraceMetrics, "include-one") + register(TraceMetrics, "exclude-two") + register(TraceMetrics, "include-three") + subscribe("trace", "include-*", subscriber.ref, permanently = true) for (repetition ← 1 to 5) { - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(2) - tickSnapshot.metrics.keys should contain(TraceMetrics("include-one")) - tickSnapshot.metrics.keys should contain(TraceMetrics("include-three")) - subscriber.expectNoMsg(1 second) + tickSnapshot.metrics.keys should contain(Entity("include-one", "trace")) + tickSnapshot.metrics.keys should contain(Entity("include-three", "trace")) } } "send a single TickMetricSnapshot to each subscriber, even if subscribed multiple times" in { val subscriber = TestProbe() - register(TraceMetrics("include-one"), TraceMetrics.Factory) - register(TraceMetrics("exclude-two"), TraceMetrics.Factory) - register(TraceMetrics("include-three"), TraceMetrics.Factory) - subscribe(TraceMetrics, "include-one", subscriber.ref, permanently = true) - subscribe(TraceMetrics, "include-three", subscriber.ref, permanently = true) + register(TraceMetrics, "include-one") + register(TraceMetrics, "exclude-two") + register(TraceMetrics, "include-three") + subscribe("trace", "include-one", subscriber.ref, permanently = true) + subscribe("trace", "include-three", subscriber.ref, permanently = true) for (repetition ← 1 to 5) { - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(2) - tickSnapshot.metrics.keys should contain(TraceMetrics("include-one")) - tickSnapshot.metrics.keys should contain(TraceMetrics("include-three")) + tickSnapshot.metrics.keys should contain(Entity("include-one", "trace")) + tickSnapshot.metrics.keys should contain(Entity("include-three", "trace")) } } "allow un-subscribing a subscriber" in { val subscriber = TestProbe() - register(TraceMetrics("one-shot"), TraceMetrics.Factory) - subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = true) + register(TraceMetrics, "one-shot") + subscribe("trace", "one-shot", subscriber.ref, permanently = true) - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(1) - tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) + tickSnapshot.metrics.keys should contain(Entity("one-shot", "trace")) unsubscribe(subscriber.ref) - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() subscriber.expectNoMsg(1 second) } + } - "watch all subscribers and un-subscribe them if they die" in { - val subscriber = TestProbe() - val forwarderSubscriber = system.actorOf(Props(new ForwarderSubscriber(subscriber.ref))) - watch(forwarderSubscriber) - register(TraceMetrics("one-shot"), TraceMetrics.Factory) - subscribe(TraceMetrics, "one-shot", forwarderSubscriber, permanently = true) - - metricsExtension.subscriptions ! Subscriptions.FlushMetrics - val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] - tickSnapshot.metrics.size should be(1) - tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) - - forwarderSubscriber ! PoisonPill - expectTerminated(forwarderSubscriber) - - metricsExtension.subscriptions ! Subscriptions.FlushMetrics - metricsExtension.subscriptions ! Subscriptions.FlushMetrics - metricsExtension.subscriptions ! Subscriptions.FlushMetrics - metricsExtension.subscriptions ! Subscriptions.FlushMetrics - subscriber.expectNoMsg(2 seconds) - } + def subscriptionsActor: ActorRef = { + val listener = TestProbe() + system.actorSelection("/user/kamon/kamon-metrics").tell(Identify(1), listener.ref) + listener.expectMsgType[ActorIdentity].ref.get } } diff --git a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala index a9197ab5..2e1f246d 100644 --- a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala @@ -17,32 +17,29 @@ package kamon.metric import com.typesafe.config.ConfigFactory -import kamon.{ MilliTimestamp, Kamon } -import kamon.metric.instrument.Histogram import kamon.metric.instrument.Histogram.MutableRecord -import org.scalatest.{ Matchers, WordSpecLike } -import akka.testkit.{ ImplicitSender, TestKitBase } -import akka.actor.ActorSystem +import kamon.testkit.BaseKamonSpec +import kamon.util.MilliTimestamp +import akka.testkit.ImplicitSender import scala.concurrent.duration._ -import kamon.metric.Subscriptions.TickMetricSnapshot - -class TickMetricSnapshotBufferSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 hour - | default-collection-context-buffer-size = 10 - | - | filters = [ - | { - | trace { - | includes = [ "*" ] - | excludes = [ "non-tracked-trace"] - | } - | } - | ] - |} - """.stripMargin)) +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot + +class TickMetricSnapshotBufferSpec extends BaseKamonSpec("trace-metrics-spec") with ImplicitSender { + override lazy val config = + ConfigFactory.parseString( + """ + |kamon.metric { + | tick-interval = 1 hour + | default-collection-context-buffer-size = 10 + | + | filters { + | trace { + | includes = [ "*" ] + | excludes = [ "non-tracked-trace" ] + | } + | } + |} + """.stripMargin) "the TickMetricSnapshotBuffer" should { "merge TickMetricSnapshots received until the flush timeout is reached and fix the from/to fields" in new SnapshotFixtures { @@ -74,7 +71,7 @@ class TickMetricSnapshotBufferSpec extends TestKitBase with WordSpecLike with Ma mergedSnapshot.to.millis should equal(4000) mergedSnapshot.metrics should not be ('empty) - val testMetricSnapshot = mergedSnapshot.metrics(testTraceIdentity).metrics(TraceMetrics.ElapsedTime).asInstanceOf[Histogram.Snapshot] + val testMetricSnapshot = mergedSnapshot.metrics(testTraceIdentity).histogram("elapsed-time").get testMetricSnapshot.min should equal(10) testMetricSnapshot.max should equal(300) testMetricSnapshot.numberOfMeasurements should equal(6) @@ -88,23 +85,23 @@ class TickMetricSnapshotBufferSpec extends TestKitBase with WordSpecLike with Ma } trait SnapshotFixtures { - val collectionContext = Kamon(Metrics).buildDefaultCollectionContext - val testTraceIdentity = TraceMetrics("buffer-spec-test-trace") - val traceRecorder = Kamon(Metrics).register(testTraceIdentity, TraceMetrics.Factory).get + val collectionContext = kamon.metrics.buildDefaultCollectionContext + val testTraceIdentity = Entity("buffer-spec-test-trace", "trace") + val traceRecorder = kamon.metrics.register(TraceMetrics, "buffer-spec-test-trace").get.recorder val firstEmpty = TickMetricSnapshot(new MilliTimestamp(1000), new MilliTimestamp(2000), Map.empty) val secondEmpty = TickMetricSnapshot(new MilliTimestamp(2000), new MilliTimestamp(3000), Map.empty) val thirdEmpty = TickMetricSnapshot(new MilliTimestamp(3000), new MilliTimestamp(4000), Map.empty) - traceRecorder.elapsedTime.record(10L) - traceRecorder.elapsedTime.record(20L) - traceRecorder.elapsedTime.record(30L) + traceRecorder.ElapsedTime.record(10L) + traceRecorder.ElapsedTime.record(20L) + traceRecorder.ElapsedTime.record(30L) val firstNonEmpty = TickMetricSnapshot(new MilliTimestamp(1000), new MilliTimestamp(2000), Map( (testTraceIdentity -> traceRecorder.collect(collectionContext)))) - traceRecorder.elapsedTime.record(10L) - traceRecorder.elapsedTime.record(10L) - traceRecorder.elapsedTime.record(300L) + traceRecorder.ElapsedTime.record(10L) + traceRecorder.ElapsedTime.record(10L) + traceRecorder.ElapsedTime.record(300L) val secondNonEmpty = TickMetricSnapshot(new MilliTimestamp(1000), new MilliTimestamp(2000), Map( (testTraceIdentity -> traceRecorder.collect(collectionContext)))) } diff --git a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala index cd10f2d3..793c0112 100644 --- a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala @@ -1,92 +1,83 @@ package kamon.metric -import akka.actor.ActorSystem -import akka.testkit.{ ImplicitSender, TestKitBase } +import akka.testkit.ImplicitSender import com.typesafe.config.ConfigFactory -import kamon.Kamon -import kamon.metric.TraceMetrics.TraceMetricsSnapshot -import kamon.trace.{ SegmentMetricIdentity, TraceRecorder } -import org.scalatest.{ Matchers, WordSpecLike } +import kamon.testkit.BaseKamonSpec +import kamon.trace.TraceContext +import kamon.metric.instrument.Histogram -class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 hour - | default-collection-context-buffer-size = 10 - | - | filters = [ - | { - | trace { - | includes = [ "*" ] - | excludes = [ "non-tracked-trace"] - | } - | } - | ] - | precision { - | default-histogram-precision { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 - | } - | - | default-min-max-counter-precision { - | refresh-interval = 1 second - | highest-trackable-value = 999999999 - | significant-value-digits = 2 - | } - | } - |} - """.stripMargin)) +class TraceMetricsSpec extends BaseKamonSpec("trace-metrics-spec") with ImplicitSender { + import TraceMetricsSpec.SegmentSyntax + + override lazy val config = + ConfigFactory.parseString( + """ + |kamon.metric { + | tick-interval = 1 hour + | default-collection-context-buffer-size = 10 + | + | filters { + | trace { + | includes = [ "*" ] + | excludes = [ "non-tracked-trace"] + | } + | } + |} + """.stripMargin) "the TraceMetrics" should { "record the elapsed time between a trace creation and finish" in { for (repetitions ← 1 to 10) { - TraceRecorder.withNewTraceContext("record-elapsed-time") { - TraceRecorder.finish() + TraceContext.withContext(newContext("record-elapsed-time")) { + TraceContext.currentContext.finish() } } - val snapshot = takeSnapshotOf("record-elapsed-time") - snapshot.elapsedTime.numberOfMeasurements should be(10) - snapshot.segments shouldBe empty + val snapshot = takeSnapshotOf("record-elapsed-time", "trace") + snapshot.histogram("elapsed-time").get.numberOfMeasurements should be(10) } "record the elapsed time for segments that occur inside a given trace" in { - TraceRecorder.withNewTraceContext("trace-with-segments") { - val segment = TraceRecorder.currentContext.startSegment("test-segment", "test-category", "test-library") + TraceContext.withContext(newContext("trace-with-segments")) { + val segment = TraceContext.currentContext.startSegment("test-segment", "test-category", "test-library") segment.finish() - TraceRecorder.finish() + TraceContext.currentContext.finish() } - val snapshot = takeSnapshotOf("trace-with-segments") - snapshot.elapsedTime.numberOfMeasurements should be(1) + val snapshot = takeSnapshotOf("trace-with-segments", "trace") + snapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) snapshot.segments.size should be(1) - snapshot.segments(SegmentMetricIdentity("test-segment", "test-category", "test-library")).numberOfMeasurements should be(1) + snapshot.segment("test-segment", "test-category", "test-library").numberOfMeasurements should be(1) } "record the elapsed time for segments that finish after their correspondent trace has finished" in { - val segment = TraceRecorder.withNewTraceContext("closing-segment-after-trace") { - val s = TraceRecorder.currentContext.startSegment("test-segment", "test-category", "test-library") - TraceRecorder.finish() + val segment = TraceContext.withContext(newContext("closing-segment-after-trace")) { + val s = TraceContext.currentContext.startSegment("test-segment", "test-category", "test-library") + TraceContext.currentContext.finish() s } - val beforeFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace") - beforeFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(1) + val beforeFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace", "trace") + beforeFinishSegmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) beforeFinishSegmentSnapshot.segments.size should be(0) segment.finish() - val afterFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace") - afterFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(0) + val afterFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace", "trace") + afterFinishSegmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(0) afterFinishSegmentSnapshot.segments.size should be(1) - afterFinishSegmentSnapshot.segments(SegmentMetricIdentity("test-segment", "test-category", "test-library")).numberOfMeasurements should be(1) + afterFinishSegmentSnapshot.segment("test-segment", "test-category", "test-library").numberOfMeasurements should be(1) } } +} + +object TraceMetricsSpec { + implicit class SegmentSyntax(val entitySnapshot: EntitySnapshot) extends AnyVal { + def segments: Map[HistogramKey, Histogram.Snapshot] = { + entitySnapshot.histograms.filterKeys(_.metadata.contains("category")) + } - def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = { - val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory) - val collectionContext = Kamon(Metrics).buildDefaultCollectionContext - recorder.get.collect(collectionContext) + def segment(name: String, category: String, library: String): Histogram.Snapshot = + segments(TraceMetrics.segmentKey(name, category, library)) } } diff --git a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala index 6c4fe3fb..a345c6a9 100644 --- a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala @@ -1,310 +1,110 @@ package kamon.metric -import akka.actor.{ Props, ActorSystem } -import akka.testkit.{ ImplicitSender, TestKitBase } import com.typesafe.config.ConfigFactory -import kamon.Kamon -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.UserMetrics._ -import kamon.metric.instrument.{ Histogram, Counter, MinMaxCounter, Gauge } -import kamon.metric.instrument.Histogram.MutableRecord -import org.scalatest.{ Matchers, WordSpecLike } +import kamon.metric.instrument.Histogram.DynamicRange +import kamon.testkit.BaseKamonSpec import scala.concurrent.duration._ -class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 hour - | default-collection-context-buffer-size = 10 - | - | precision { - | default-histogram-precision { - | highest-trackable-value = 10000 - | significant-value-digits = 2 - | } - | - | default-min-max-counter-precision { - | refresh-interval = 1 hour - | highest-trackable-value = 1000 - | significant-value-digits = 2 - | } - | - | default-gauge-precision { - | refresh-interval = 1 hour - | highest-trackable-value = 999999999 - | significant-value-digits = 2 - | } - | } - |} - """.stripMargin)) +class UserMetricsSpec extends BaseKamonSpec("user-metrics-spec") { + override lazy val config = + ConfigFactory.parseString( + """ + |kamon.metric { + | tick-interval = 1 hour + | default-collection-context-buffer-size = 10 + |} + """.stripMargin) "the UserMetrics extension" should { + "allow registering a fully configured Histogram and get the same Histogram if registering again" in { - val histogramA = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L) - val histogramB = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L) + val histogramA = kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 10000, 2)) + val histogramB = kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 10000, 2)) histogramA shouldBe theSameInstanceAs(histogramB) } "return the original Histogram when registering a fully configured Histogram for second time but with different settings" in { - val histogramA = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L) - val histogramB = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Fine, 50000L) + val histogramA = kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 10000, 2)) + val histogramB = kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 50000, 2)) histogramA shouldBe theSameInstanceAs(histogramB) } "allow registering a Histogram that takes the default configuration from the kamon.metrics.precision settings" in { - Kamon(UserMetrics).registerHistogram("histogram-with-default-configuration") + kamon.userMetrics.histogram("histogram-with-default-configuration") } "allow registering a Counter and get the same Counter if registering again" in { - val counterA = Kamon(UserMetrics).registerCounter("counter") - val counterB = Kamon(UserMetrics).registerCounter("counter") + val counterA = kamon.userMetrics.counter("counter") + val counterB = kamon.userMetrics.counter("counter") counterA shouldBe theSameInstanceAs(counterB) } "allow registering a fully configured MinMaxCounter and get the same MinMaxCounter if registering again" in { - val minMaxCounterA = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second) - val minMaxCounterB = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second) + val minMaxCounterA = kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 10000, 2), 1 second) + val minMaxCounterB = kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 10000, 2), 1 second) minMaxCounterA shouldBe theSameInstanceAs(minMaxCounterB) } "return the original MinMaxCounter when registering a fully configured MinMaxCounter for second time but with different settings" in { - val minMaxCounterA = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second) - val minMaxCounterB = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Fine, 5000L, 1 second) + val minMaxCounterA = kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 10000, 2), 1 second) + val minMaxCounterB = kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 50000, 2), 1 second) minMaxCounterA shouldBe theSameInstanceAs(minMaxCounterB) } "allow registering a MinMaxCounter that takes the default configuration from the kamon.metrics.precision settings" in { - Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-default-configuration") + kamon.userMetrics.minMaxCounter("min-max-counter-with-default-configuration") } "allow registering a fully configured Gauge and get the same Gauge if registering again" in { - val gaugeA = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Normal, 1000L, 1 second) { + val gaugeA = kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, { () ⇒ 1L - } + }) - val gaugeB = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Normal, 1000L, 1 second) { + val gaugeB = kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, { () ⇒ 1L - } + }) gaugeA shouldBe theSameInstanceAs(gaugeB) } "return the original Gauge when registering a fully configured Gauge for second time but with different settings" in { - val gaugeA = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Normal, 1000L, 1 second) { + val gaugeA = kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, { () ⇒ 1L - } + }) - val gaugeB = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Fine, 5000L, 1 second) { + val gaugeB = kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, { () ⇒ 1L - } + }) gaugeA shouldBe theSameInstanceAs(gaugeB) } "allow registering a Gauge that takes the default configuration from the kamon.metrics.precision settings" in { - Kamon(UserMetrics).registerGauge("gauge-with-default-configuration") { + kamon.userMetrics.gauge("gauge-with-default-configuration", { () ⇒ 2L - } + }) } "allow un-registering user metrics" in { - val metricsExtension = Kamon(Metrics) - Kamon(UserMetrics).registerCounter("counter-for-remove") - Kamon(UserMetrics).registerHistogram("histogram-for-remove") - Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-for-remove") - Kamon(UserMetrics).registerGauge("gauge-for-remove") { () ⇒ 2L } - - metricsExtension.storage.keys should contain(UserCounter("counter-for-remove")) - metricsExtension.storage.keys should contain(UserHistogram("histogram-for-remove")) - metricsExtension.storage.keys should contain(UserMinMaxCounter("min-max-counter-for-remove")) - metricsExtension.storage.keys should contain(UserGauge("gauge-for-remove")) - - Kamon(UserMetrics).removeCounter("counter-for-remove") - Kamon(UserMetrics).removeHistogram("histogram-for-remove") - Kamon(UserMetrics).removeMinMaxCounter("min-max-counter-for-remove") - Kamon(UserMetrics).removeGauge("gauge-for-remove") - - metricsExtension.storage.keys should not contain (UserCounter("counter-for-remove")) - metricsExtension.storage.keys should not contain (UserHistogram("histogram-for-remove")) - metricsExtension.storage.keys should not contain (UserMinMaxCounter("min-max-counter-for-remove")) - metricsExtension.storage.keys should not contain (UserGauge("gauge-for-remove")) - } - - "include all the registered metrics in the a tick snapshot and reset all recorders" in { - Kamon(Metrics).subscribe(UserHistograms, "*", testActor, permanently = true) - Kamon(Metrics).subscribe(UserCounters, "*", testActor, permanently = true) - Kamon(Metrics).subscribe(UserMinMaxCounters, "*", testActor, permanently = true) - Kamon(Metrics).subscribe(UserGauges, "*", testActor, permanently = true) - - val histogramWithSettings = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L) - val histogramWithDefaultConfiguration = Kamon(UserMetrics).registerHistogram("histogram-with-default-configuration") - val counter = Kamon(UserMetrics).registerCounter("counter") - val minMaxCounterWithSettings = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second) - val gauge = Kamon(UserMetrics).registerGauge("gauge-with-default-configuration") { () ⇒ 2L } - - // lets put some values on those metrics - histogramWithSettings.record(10) - histogramWithSettings.record(20, 100) - histogramWithDefaultConfiguration.record(40) - - counter.increment() - counter.increment(16) - - minMaxCounterWithSettings.increment(43) - minMaxCounterWithSettings.decrement() - - gauge.record(15) - - Kamon(Metrics).subscriptions ! Subscriptions.FlushMetrics - val firstSnapshot = expectMsgType[TickMetricSnapshot].metrics - - firstSnapshot.keys should contain allOf ( - UserHistogram("histogram-with-settings"), - UserHistogram("histogram-with-default-configuration")) - - firstSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (10) - firstSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (20) - firstSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(101) - firstSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain allOf ( - MutableRecord(10, 1), - MutableRecord(20, 100)) - - firstSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (40) - firstSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (40) - firstSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1) - firstSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain only ( - MutableRecord(40, 1)) - - firstSnapshot(UserCounter("counter")).metrics(Count).asInstanceOf[Counter.Snapshot].count should be(17) - - firstSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0) - firstSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (43) - firstSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(3) - firstSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain allOf ( - MutableRecord(0, 1), // min - MutableRecord(42, 1), // current - MutableRecord(43, 1)) // max - - firstSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0) - firstSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (0) - firstSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(3) - firstSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain only ( - MutableRecord(0, 3)) // min, max and current - - firstSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (15) - firstSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (15) - firstSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1) - firstSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain only ( - MutableRecord(15, 1)) // only the manually recorded value - - Kamon(Metrics).subscriptions ! Subscriptions.FlushMetrics - val secondSnapshot = expectMsgType[TickMetricSnapshot].metrics - - secondSnapshot.keys should contain allOf ( - UserHistogram("histogram-with-settings"), - UserHistogram("histogram-with-default-configuration")) - - secondSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0) - secondSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (0) - secondSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(0) - secondSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream shouldBe empty - - secondSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0) - secondSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (0) - secondSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(0) - secondSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream shouldBe empty - - secondSnapshot(UserCounter("counter")).metrics(Count).asInstanceOf[Counter.Snapshot].count should be(0) - - secondSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (42) - secondSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (42) - secondSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(3) - secondSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain only ( - MutableRecord(42, 3)) // max - - secondSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0) - secondSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (0) - secondSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(3) - secondSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain only ( - MutableRecord(0, 3)) // min, max and current - - secondSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0) - secondSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (0) - secondSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(0) - secondSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream shouldBe empty - - Kamon(Metrics).unsubscribe(testActor) - } - - "generate a snapshot that can be merged with another" in { - val buffer = system.actorOf(TickMetricSnapshotBuffer.props(1 hours, testActor)) - Kamon(Metrics).subscribe(UserHistograms, "*", buffer, permanently = true) - Kamon(Metrics).subscribe(UserCounters, "*", buffer, permanently = true) - Kamon(Metrics).subscribe(UserMinMaxCounters, "*", buffer, permanently = true) - Kamon(Metrics).subscribe(UserGauges, "*", buffer, permanently = true) - - val histogram = Kamon(UserMetrics).registerHistogram("histogram-for-merge") - val counter = Kamon(UserMetrics).registerCounter("counter-for-merge") - val minMaxCounter = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-for-merge") - val gauge = Kamon(UserMetrics).registerGauge("gauge-for-merge") { () ⇒ 10L } - - histogram.record(100) - counter.increment(10) - minMaxCounter.increment(50) - minMaxCounter.decrement(10) - gauge.record(50) - - Kamon(Metrics).subscriptions ! Subscriptions.FlushMetrics - Thread.sleep(2000) // Make sure that the snapshots are taken before proceeding - - val extraCounter = Kamon(UserMetrics).registerCounter("extra-counter") - histogram.record(200) - extraCounter.increment(20) - minMaxCounter.increment(40) - minMaxCounter.decrement(50) - gauge.record(70) - - Kamon(Metrics).subscriptions ! Subscriptions.FlushMetrics - Thread.sleep(2000) // Make sure that the metrics are buffered. - buffer ! TickMetricSnapshotBuffer.FlushBuffer - val snapshot = expectMsgType[TickMetricSnapshot].metrics - - snapshot.keys should contain(UserHistogram("histogram-for-merge")) - - snapshot(UserHistogram("histogram-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (100) - snapshot(UserHistogram("histogram-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (200) - snapshot(UserHistogram("histogram-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(2) - snapshot(UserHistogram("histogram-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain allOf ( - MutableRecord(100, 1), - MutableRecord(200, 1)) - - snapshot(UserCounter("counter-for-merge")).metrics(Count).asInstanceOf[Counter.Snapshot].count should be(10) - snapshot(UserCounter("extra-counter")).metrics(Count).asInstanceOf[Counter.Snapshot].count should be(20) - - snapshot(UserMinMaxCounter("min-max-counter-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0) - snapshot(UserMinMaxCounter("min-max-counter-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (80) - snapshot(UserMinMaxCounter("min-max-counter-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(6) - snapshot(UserMinMaxCounter("min-max-counter-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain allOf ( - MutableRecord(0, 1), // min in first snapshot - MutableRecord(30, 2), // min and current in second snapshot - MutableRecord(40, 1), // current in first snapshot - MutableRecord(50, 1), // max in first snapshot - MutableRecord(80, 1)) // max in second snapshot - - snapshot(UserGauge("gauge-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (50) - snapshot(UserGauge("gauge-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (70) - snapshot(UserGauge("gauge-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(2) - snapshot(UserGauge("gauge-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain allOf ( - MutableRecord(50, 1), - MutableRecord(70, 1)) - - Kamon(Metrics).unsubscribe(testActor) + val counter = kamon.userMetrics.counter("counter-for-remove") + val histogram = kamon.userMetrics.histogram("histogram-for-remove") + val minMaxCounter = kamon.userMetrics.minMaxCounter("min-max-counter-for-remove") + val gauge = kamon.userMetrics.gauge("gauge-for-remove", { () ⇒ 2L }) + + kamon.userMetrics.removeCounter("counter-for-remove") + kamon.userMetrics.removeHistogram("histogram-for-remove") + kamon.userMetrics.removeMinMaxCounter("min-max-counter-for-remove") + kamon.userMetrics.removeGauge("gauge-for-remove") + + counter should not be (theSameInstanceAs(kamon.userMetrics.counter("counter-for-remove"))) + histogram should not be (theSameInstanceAs(kamon.userMetrics.histogram("histogram-for-remove"))) + minMaxCounter should not be (theSameInstanceAs(kamon.userMetrics.minMaxCounter("min-max-counter-for-remove"))) + gauge should not be (theSameInstanceAs(kamon.userMetrics.gauge("gauge-for-remove", { () ⇒ 2L }))) } } } diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala index 1a93e1f6..500a69c5 100644 --- a/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala @@ -2,7 +2,6 @@ package kamon.metric.instrument import java.nio.LongBuffer -import kamon.metric.CollectionContext import org.scalatest.{ Matchers, WordSpec } class CounterSpec extends WordSpec with Matchers { diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala index 9192d999..bd39652c 100644 --- a/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala @@ -1,72 +1,62 @@ package kamon.metric.instrument import java.util.concurrent.atomic.AtomicLong - -import akka.actor.ActorSystem -import com.typesafe.config.ConfigFactory -import kamon.Kamon -import kamon.metric.{ Metrics, Scale, CollectionContext } -import org.scalatest.{ Matchers, WordSpecLike } +import kamon.metric.instrument.Histogram.DynamicRange +import kamon.testkit.BaseKamonSpec import scala.concurrent.duration._ -class GaugeSpec extends WordSpecLike with Matchers { - implicit val system = ActorSystem("gauge-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | flush-interval = 1 hour - | default-collection-context-buffer-size = 10 - | precision { - | default-gauge-precision { - | refresh-interval = 100 milliseconds - | highest-trackable-value = 999999999 - | significant-value-digits = 2 - | } - | } - |} - """.stripMargin)) +class GaugeSpec extends BaseKamonSpec("gauge-spec") { "a Gauge" should { - "automatically record the current value using the configured refresh-interval" in { - val numberOfValuesRecorded = new AtomicLong(0) - val gauge = Gauge.fromDefaultConfig(system) { () ⇒ numberOfValuesRecorded.addAndGet(1) } - + "automatically record the current value using the configured refresh-interval" in new GaugeFixture { + val (numberOfValuesRecorded, gauge) = createGauge() Thread.sleep(1.second.toMillis) + numberOfValuesRecorded.get() should be(10L +- 1L) gauge.cleanup } - "stop automatically recording after a call to cleanup" in { - val numberOfValuesRecorded = new AtomicLong(0) - val gauge = Gauge.fromDefaultConfig(system) { () ⇒ numberOfValuesRecorded.addAndGet(1) } - + "stop automatically recording after a call to cleanup" in new GaugeFixture { + val (numberOfValuesRecorded, gauge) = createGauge() Thread.sleep(1.second.toMillis) + gauge.cleanup numberOfValuesRecorded.get() should be(10L +- 1L) Thread.sleep(1.second.toMillis) + numberOfValuesRecorded.get() should be(10L +- 1L) } - "produce a Histogram snapshot including all the recorded values" in { - val numberOfValuesRecorded = new AtomicLong(0) - val gauge = Gauge.fromDefaultConfig(system) { () ⇒ numberOfValuesRecorded.addAndGet(1) } + "produce a Histogram snapshot including all the recorded values" in new GaugeFixture { + val (numberOfValuesRecorded, gauge) = createGauge() Thread.sleep(1.second.toMillis) gauge.cleanup - val snapshot = gauge.collect(Kamon(Metrics).buildDefaultCollectionContext) + val snapshot = gauge.collect(kamon.metrics.buildDefaultCollectionContext) snapshot.numberOfMeasurements should be(10L +- 1L) snapshot.min should be(1) snapshot.max should be(10L +- 1L) } - "not record the current value when doing a collection" in { - val numberOfValuesRecorded = new AtomicLong(0) - val gauge = Gauge(Histogram.Precision.Normal, 10000L, Scale.Unit, 1 hour, system)(() ⇒ numberOfValuesRecorded.addAndGet(1)) - - val snapshot = gauge.collect(Kamon(Metrics).buildDefaultCollectionContext) + "not record the current value when doing a collection" in new GaugeFixture { + val (numberOfValuesRecorded, gauge) = createGauge(10 seconds) + val snapshot = gauge.collect(kamon.metrics.buildDefaultCollectionContext) snapshot.numberOfMeasurements should be(0) numberOfValuesRecorded.get() should be(0) } } + + trait GaugeFixture { + def createGauge(refreshInterval: FiniteDuration = 100 millis): (AtomicLong, Gauge) = { + val recordedValuesCounter = new AtomicLong(0) + val gauge = Gauge(DynamicRange(1, 100, 2), refreshInterval, kamon.metrics.settings.refreshScheduler, { + () ⇒ recordedValuesCounter.addAndGet(1) + }) + + (recordedValuesCounter, gauge) + } + + } } diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala index c3060d4a..9a50e149 100644 --- a/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala @@ -18,22 +18,13 @@ package kamon.metric.instrument import java.nio.LongBuffer -import com.typesafe.config.ConfigFactory -import kamon.metric.CollectionContext +import kamon.metric.instrument.Histogram.DynamicRange import org.scalatest.{ Matchers, WordSpec } import scala.util.Random class HistogramSpec extends WordSpec with Matchers { - val histogramConfig = ConfigFactory.parseString( - """ - | - |highest-trackable-value = 100000 - |significant-value-digits = 2 - | - """.stripMargin) - "a Histogram" should { "allow record values within the configured range" in new HistogramFixture { histogram.record(1000) @@ -109,7 +100,7 @@ class HistogramSpec extends WordSpec with Matchers { val buffer: LongBuffer = LongBuffer.allocate(10000) } - val histogram = Histogram.fromConfig(histogramConfig) + val histogram = Histogram(DynamicRange(1, 100000, 2)) def takeSnapshot(): Histogram.Snapshot = histogram.collect(collectionContext) } @@ -119,17 +110,20 @@ class HistogramSpec extends WordSpec with Matchers { val buffer: LongBuffer = LongBuffer.allocate(10000) } - val controlHistogram = Histogram.fromConfig(histogramConfig) - val histogramA = Histogram.fromConfig(histogramConfig) - val histogramB = Histogram.fromConfig(histogramConfig) + val controlHistogram = Histogram(DynamicRange(1, 100000, 2)) + val histogramA = Histogram(DynamicRange(1, 100000, 2)) + val histogramB = Histogram(DynamicRange(1, 100000, 2)) + + def takeSnapshotFrom(histogram: Histogram): InstrumentSnapshot = histogram.collect(collectionContext) - def takeSnapshotFrom(histogram: Histogram): Histogram.Snapshot = histogram.collect(collectionContext) + def assertEquals(left: InstrumentSnapshot, right: InstrumentSnapshot): Unit = { + val leftSnapshot = left.asInstanceOf[Histogram.Snapshot] + val rightSnapshot = right.asInstanceOf[Histogram.Snapshot] - def assertEquals(left: Histogram.Snapshot, right: Histogram.Snapshot): Unit = { - left.numberOfMeasurements should equal(right.numberOfMeasurements) - left.min should equal(right.min) - left.max should equal(right.max) - left.recordsIterator.toStream should contain theSameElementsAs (right.recordsIterator.toStream) + leftSnapshot.numberOfMeasurements should equal(rightSnapshot.numberOfMeasurements) + leftSnapshot.min should equal(rightSnapshot.min) + leftSnapshot.max should equal(rightSnapshot.max) + leftSnapshot.recordsIterator.toStream should contain theSameElementsAs (rightSnapshot.recordsIterator.toStream) } } } diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala index 2c11adc3..7a3d7aa3 100644 --- a/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala @@ -19,19 +19,11 @@ import java.nio.LongBuffer import akka.actor._ import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory -import kamon.metric.CollectionContext -import kamon.metric.instrument.Histogram.MutableRecord -import org.scalatest.{ Matchers, WordSpecLike } - -class MinMaxCounterSpec extends WordSpecLike with Matchers { - implicit val system = ActorSystem("min-max-counter-spec") - val minMaxCounterConfig = ConfigFactory.parseString( - """ - |refresh-interval = 1 hour - |highest-trackable-value = 1000 - |significant-value-digits = 2 - """.stripMargin) +import kamon.metric.instrument.Histogram.{ DynamicRange, MutableRecord } +import kamon.testkit.BaseKamonSpec +import scala.concurrent.duration._ + +class MinMaxCounterSpec extends BaseKamonSpec("min-max-counter-spec") { "the MinMaxCounter" should { "track ascending tendencies" in new MinMaxCounterFixture { @@ -104,7 +96,7 @@ class MinMaxCounterSpec extends WordSpecLike with Matchers { workers foreach (_ ! "increment") for (refresh ← 1 to 1000) { collectCounterSnapshot() - Thread.sleep(10) + Thread.sleep(1) } monitor.expectNoMsg() @@ -117,7 +109,7 @@ class MinMaxCounterSpec extends WordSpecLike with Matchers { val buffer: LongBuffer = LongBuffer.allocate(64) } - val mmCounter = MinMaxCounter.fromConfig(minMaxCounterConfig, system).asInstanceOf[PaddedMinMaxCounter] + val mmCounter = MinMaxCounter(DynamicRange(1, 1000, 2), 1 hour, kamon.metrics.settings.refreshScheduler) mmCounter.cleanup // cancel the refresh schedule def collectCounterSnapshot(): Histogram.Snapshot = mmCounter.collect(collectionContext) diff --git a/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala b/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala new file mode 100644 index 00000000..20fc3ed5 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala @@ -0,0 +1,34 @@ +package kamon.testkit + +import akka.testkit.{ ImplicitSender, TestKitBase } +import akka.actor.ActorSystem +import com.typesafe.config.{ Config, ConfigFactory } +import kamon.Kamon +import kamon.metric.{ SubscriptionsDispatcher, EntitySnapshot, MetricsExtensionImpl } +import kamon.trace.TraceContext +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } + +abstract class BaseKamonSpec(actorSystemName: String) extends TestKitBase with WordSpecLike with Matchers with ImplicitSender with BeforeAndAfterAll { + lazy val kamon = Kamon(actorSystemName, config) + lazy val collectionContext = kamon.metrics.buildDefaultCollectionContext + implicit lazy val system: ActorSystem = kamon.actorSystem + + def config: Config = + ConfigFactory.load() + + def newContext(name: String): TraceContext = + kamon.tracer.newContext(name) + + def newContext(name: String, token: String): TraceContext = + kamon.tracer.newContext(name, token) + + def takeSnapshotOf(name: String, category: String): EntitySnapshot = { + val recorder = kamon.metrics.find(name, category).get + recorder.collect(collectionContext) + } + + def flushSubscriptions(): Unit = + system.actorSelection("/user/kamon/subscriptions-dispatcher") ! SubscriptionsDispatcher.Tick + + override protected def afterAll(): Unit = system.shutdown() +} diff --git a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala index cda9cad7..0cb4ce34 100644 --- a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala @@ -16,58 +16,40 @@ package kamon.trace -import akka.actor.ActorSystem -import akka.testkit.{ ImplicitSender, TestKitBase } import com.typesafe.config.ConfigFactory import kamon.Kamon -import org.scalatest.{ Matchers, WordSpecLike } +import kamon.testkit.BaseKamonSpec import scala.concurrent.duration._ -class SimpleTraceSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("simple-trace-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 hour - | filters = [ - | { - | trace { - | includes = [ "*" ] - | excludes = [ "non-tracked-trace"] - | } - | } - | ] - | precision { - | default-histogram-precision { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 - | } - | - | default-min-max-counter-precision { - | refresh-interval = 1 second - | highest-trackable-value = 999999999 - | significant-value-digits = 2 - | } - | } - |} - | - |kamon.trace { - | level = simple-trace - | sampling = all - |} - """.stripMargin)) +class SimpleTraceSpec extends BaseKamonSpec("simple-trace-spec") { + + override lazy val config = + ConfigFactory.parseString( + """ + |kamon { + | metric { + | tick-interval = 1 hour + | } + | + | trace { + | level-of-detail = simple-trace + | sampling = all + | } + |} + """.stripMargin) "the simple tracing" should { "send a TraceInfo when the trace has finished and all segments are finished" in { - Kamon(Trace)(system).subscribe(testActor) + Kamon(Tracer)(system).subscribe(testActor) - TraceRecorder.withNewTraceContext("simple-trace-without-segments") { - TraceRecorder.currentContext.startSegment("segment-one", "test-segment", "test").finish() - TraceRecorder.currentContext.startSegment("segment-two", "test-segment", "test").finish() - TraceRecorder.finish() + TraceContext.withContext(newContext("simple-trace-without-segments")) { + TraceContext.currentContext.startSegment("segment-one", "test-segment", "test").finish() + TraceContext.currentContext.startSegment("segment-two", "test-segment", "test").finish() + TraceContext.currentContext.finish() } val traceInfo = expectMsgType[TraceInfo] - Kamon(Trace)(system).unsubscribe(testActor) + Kamon(Tracer)(system).unsubscribe(testActor) traceInfo.name should be("simple-trace-without-segments") traceInfo.segments.size should be(2) @@ -76,12 +58,12 @@ class SimpleTraceSpec extends TestKitBase with WordSpecLike with Matchers with I } "incubate the tracing context if there are open segments after finishing" in { - Kamon(Trace)(system).subscribe(testActor) + Kamon(Tracer)(system).subscribe(testActor) - val secondSegment = TraceRecorder.withNewTraceContext("simple-trace-without-segments") { - TraceRecorder.currentContext.startSegment("segment-one", "test-segment", "test").finish() - val segment = TraceRecorder.currentContext.startSegment("segment-two", "test-segment", "test") - TraceRecorder.finish() + val secondSegment = TraceContext.withContext(newContext("simple-trace-without-segments")) { + TraceContext.currentContext.startSegment("segment-one", "test-segment", "test").finish() + val segment = TraceContext.currentContext.startSegment("segment-two", "test-segment", "test") + TraceContext.currentContext.finish() segment } @@ -90,7 +72,7 @@ class SimpleTraceSpec extends TestKitBase with WordSpecLike with Matchers with I within(10 seconds) { val traceInfo = expectMsgType[TraceInfo] - Kamon(Trace)(system).unsubscribe(testActor) + Kamon(Tracer)(system).unsubscribe(testActor) traceInfo.name should be("simple-trace-without-segments") traceInfo.segments.size should be(2) diff --git a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala index 0875deff..9d7725b7 100644 --- a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala @@ -1,94 +1,80 @@ package kamon.trace -import akka.actor.ActorSystem -import akka.testkit.{ ImplicitSender, TestKitBase } import com.typesafe.config.ConfigFactory -import org.scalatest.{ Matchers, WordSpecLike } +import kamon.testkit.BaseKamonSpec -class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 hour - | filters = [ - | { - | trace { - | includes = [ "*" ] - | excludes = [ "non-tracked-trace"] - | } - | } - | ] - | precision { - | default-histogram-precision { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 - | } - | - | default-min-max-counter-precision { - | refresh-interval = 1 second - | highest-trackable-value = 999999999 - | significant-value-digits = 2 - | } - | } - |} - """.stripMargin)) +class TraceContextManipulationSpec extends BaseKamonSpec("trace-metrics-spec") { + override lazy val config = + ConfigFactory.parseString( + """ + |kamon.metric { + | tick-interval = 1 hour + | + | filters { + | trace { + | includes = [ "*" ] + | excludes = [ "non-tracked-trace"] + | } + | } + |} + """.stripMargin) - "the TraceRecorder api" should { + "the TraceContext api" should { "allow starting a trace within a specified block of code, and only within that block of code" in { - val createdContext = TraceRecorder.withNewTraceContext("start-context") { - TraceRecorder.currentContext should not be empty - TraceRecorder.currentContext + val createdContext = TraceContext.withContext(newContext("start-context")) { + TraceContext.currentContext should not be empty + TraceContext.currentContext } - TraceRecorder.currentContext shouldBe empty + TraceContext.currentContext shouldBe empty createdContext.name shouldBe ("start-context") } "allow starting a trace within a specified block of code, providing a trace-token and only within that block of code" in { - val createdContext = TraceRecorder.withNewTraceContext("start-context-with-token", Some("token-1")) { - TraceRecorder.currentContext should not be empty - TraceRecorder.currentContext + val createdContext = TraceContext.withContext(newContext("start-context-with-token", "token-1")) { + TraceContext.currentContext should not be empty + TraceContext.currentContext } - TraceRecorder.currentContext shouldBe empty + TraceContext.currentContext shouldBe empty createdContext.name shouldBe ("start-context-with-token") createdContext.token should be("token-1") } "allow providing a TraceContext and make it available within a block of code" in { - val createdContext = TraceRecorder.withNewTraceContext("manually-provided-trace-context") { TraceRecorder.currentContext } + val createdContext = newContext("manually-provided-trace-context") - TraceRecorder.currentContext shouldBe empty - TraceRecorder.withTraceContext(createdContext) { - TraceRecorder.currentContext should be(createdContext) + TraceContext.currentContext shouldBe empty + TraceContext.withContext(createdContext) { + TraceContext.currentContext should be(createdContext) } - TraceRecorder.currentContext shouldBe empty + TraceContext.currentContext shouldBe empty } "allow renaming a trace" in { - val createdContext = TraceRecorder.withNewTraceContext("trace-before-rename") { - TraceRecorder.rename("renamed-trace") - TraceRecorder.currentContext + val createdContext = TraceContext.withContext(newContext("trace-before-rename")) { + TraceContext.currentContext.rename("renamed-trace") + TraceContext.currentContext } - TraceRecorder.currentContext shouldBe empty + TraceContext.currentContext shouldBe empty createdContext.name shouldBe ("renamed-trace") } "allow creating a segment within a trace" in { - val createdContext = TraceRecorder.withNewTraceContext("trace-with-segments") { - val segment = TraceRecorder.currentContext.startSegment("segment-1", "segment-1-category", "segment-library") - TraceRecorder.currentContext + val createdContext = TraceContext.withContext(newContext("trace-with-segments")) { + val segment = TraceContext.currentContext.startSegment("segment-1", "segment-1-category", "segment-library") + TraceContext.currentContext } - TraceRecorder.currentContext shouldBe empty + TraceContext.currentContext shouldBe empty createdContext.name shouldBe ("trace-with-segments") } "allow renaming a segment" in { - TraceRecorder.withNewTraceContext("trace-with-renamed-segment") { - val segment = TraceRecorder.currentContext.startSegment("original-segment-name", "segment-label", "segment-library") + TraceContext.withContext(newContext("trace-with-renamed-segment")) { + val segment = TraceContext.currentContext.startSegment("original-segment-name", "segment-label", "segment-library") segment.name should be("original-segment-name") segment.rename("new-segment-name") diff --git a/kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala index f2b25820..8bacca83 100644 --- a/kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala @@ -16,24 +16,21 @@ package kamon.trace -import akka.actor.ActorSystem -import akka.testkit.TestKit +import kamon.testkit.BaseKamonSpec import kamon.trace.TraceLocal.AvailableToMdc import kamon.trace.logging.MdcKeysSupport import org.scalatest.concurrent.PatienceConfiguration -import org.scalatest.{ Matchers, OptionValues, WordSpecLike } +import org.scalatest.OptionValues import org.slf4j.MDC -class TraceLocalSpec extends TestKit(ActorSystem("trace-local-spec")) with WordSpecLike with Matchers - with PatienceConfiguration with OptionValues with MdcKeysSupport { - +class TraceLocalSpec extends BaseKamonSpec("trace-local-spec") with PatienceConfiguration with OptionValues with MdcKeysSupport { val SampleTraceLocalKeyAvailableToMDC = AvailableToMdc("someKey") object SampleTraceLocalKey extends TraceLocal.TraceLocalKey { type ValueType = String } "the TraceLocal storage" should { "allow storing and retrieving values" in { - TraceRecorder.withNewTraceContext("store-and-retrieve-trace-local") { + TraceContext.withContext(newContext("store-and-retrieve-trace-local")) { val testString = "Hello World" TraceLocal.store(SampleTraceLocalKey)(testString) @@ -42,7 +39,7 @@ class TraceLocalSpec extends TestKit(ActorSystem("trace-local-spec")) with WordS } "return None when retrieving a non existent key" in { - TraceRecorder.withNewTraceContext("non-existent-key") { + TraceContext.withContext(newContext("non-existent-key")) { TraceLocal.retrieve(SampleTraceLocalKey) should equal(None) } } @@ -53,22 +50,22 @@ class TraceLocalSpec extends TestKit(ActorSystem("trace-local-spec")) with WordS "be attached to the TraceContext when it is propagated" in { val testString = "Hello World" - val testContext = TraceRecorder.withNewTraceContext("manually-propagated-trace-local") { + val testContext = TraceContext.withContext(newContext("manually-propagated-trace-local")) { TraceLocal.store(SampleTraceLocalKey)(testString) TraceLocal.retrieve(SampleTraceLocalKey).value should equal(testString) - TraceRecorder.currentContext + TraceContext.currentContext } /** No TraceLocal should be available here */ TraceLocal.retrieve(SampleTraceLocalKey) should equal(None) - TraceRecorder.withTraceContext(testContext) { + TraceContext.withContext(testContext) { TraceLocal.retrieve(SampleTraceLocalKey).value should equal(testString) } } "allow retrieve a value from the MDC when was created a key with AvailableToMdc(cool-key)" in { - TraceRecorder.withNewTraceContext("store-and-retrieve-trace-local-and-copy-to-mdc") { + TraceContext.withContext(newContext("store-and-retrieve-trace-local-and-copy-to-mdc")) { val testString = "Hello MDC" TraceLocal.store(SampleTraceLocalKeyAvailableToMDC)(testString) @@ -81,7 +78,7 @@ class TraceLocalSpec extends TestKit(ActorSystem("trace-local-spec")) with WordS } "allow retrieve a value from the MDC when was created a key with AvailableToMdc.storeForMdc(String, String)" in { - TraceRecorder.withNewTraceContext("store-and-retrieve-trace-local-and-copy-to-mdc") { + TraceContext.withContext(newContext("store-and-retrieve-trace-local-and-copy-to-mdc")) { val testString = "Hello MDC" TraceLocal.storeForMdc("someKey", testString) diff --git a/kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala b/kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala index 83992e61..ab98d0ac 100644 --- a/kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala +++ b/kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala @@ -40,6 +40,13 @@ class GlobPathFilterSpec extends WordSpecLike with Matchers { filter.accept("/user/something/otherActor") shouldBe false } + "match all expressions in the same levelss" in { + val filter = new GlobPathFilter("**") + + filter.accept("GET: /ping") shouldBe true + filter.accept("GET: /ping/pong") shouldBe true + } + "match all expressions and crosses the path boundaries" in { val filter = new GlobPathFilter("/user/actor-**") @@ -51,7 +58,7 @@ class GlobPathFilterSpec extends WordSpecLike with Matchers { filter.accept("/user/something/otherActor") shouldBe false } - "match exactly one characterr" in { + "match exactly one character" in { val filter = new GlobPathFilter("/user/actor-?") filter.accept("/user/actor-1") shouldBe true diff --git a/kamon-testkit/src/main/scala/testkit/AkkaExtensionSwap.scala b/kamon-testkit/src/main/scala/testkit/AkkaExtensionSwap.scala new file mode 100644 index 00000000..2f77df95 --- /dev/null +++ b/kamon-testkit/src/main/scala/testkit/AkkaExtensionSwap.scala @@ -0,0 +1,31 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package testkit + +import java.util.concurrent.ConcurrentHashMap + +import akka.actor.{ ActorSystem, Extension, ExtensionId } + +object AkkaExtensionSwap { + def swap(system: ActorSystem, key: ExtensionId[_], value: Extension): Unit = { + val extensionsField = system.getClass.getDeclaredField("extensions") + extensionsField.setAccessible(true) + + val extensions = extensionsField.get(system).asInstanceOf[ConcurrentHashMap[ExtensionId[_], AnyRef]] + extensions.put(key, value) + } +} diff --git a/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala b/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala index 825cc718..9e736971 100644 --- a/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala +++ b/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala @@ -17,7 +17,7 @@ package akka.testkit import org.aspectj.lang.annotation._ -import kamon.trace.{ EmptyTraceContext, TraceContextAware, TraceRecorder } +import kamon.trace.{ EmptyTraceContext, TraceContextAware, TraceContext } import org.aspectj.lang.ProceedingJoinPoint import akka.testkit.TestActor.RealMessage @@ -46,7 +46,7 @@ class TestProbeInstrumentation { case _ ⇒ EmptyTraceContext } - TraceRecorder.withTraceContext(traceContext) { + TraceContext.withContext(traceContext) { pjp.proceed } } -- cgit v1.2.3