aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-01-12 01:45:27 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2015-01-24 23:19:01 +0100
commit485abe569d23bccf2d263c82b43e59464dc7e834 (patch)
tree34dd5129afe4c4705ce80830caf8d5e48212ce39
parent61089a75240f5cc21b056087f1d633dd31981c61 (diff)
downloadKamon-485abe569d23bccf2d263c82b43e59464dc7e834.tar.gz
Kamon-485abe569d23bccf2d263c82b43e59464dc7e834.tar.bz2
Kamon-485abe569d23bccf2d263c82b43e59464dc7e834.zip
! all: improve the metric recorders infrastructure
-rw-r--r--kamon-core/src/main/resources/META-INF/aop.xml8
-rw-r--r--kamon-core/src/main/resources/reference.conf120
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala19
-rw-r--r--kamon-core/src/main/scala/kamon/ModuleSupervisor.scala48
-rw-r--r--kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala114
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala48
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala47
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Entity.scala52
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala75
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala157
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala47
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricKey.scala153
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala166
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala100
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Scale.scala31
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Subscriptions.scala173
-rw-r--r--kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala115
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala49
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala74
-rw-r--r--kamon-core/src/main/scala/kamon/metric/UserMetrics.scala278
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala (renamed from kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala)0
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala15
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala108
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala164
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala56
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala35
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala67
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala29
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala55
-rw-r--r--kamon-core/src/main/scala/kamon/metric/package.scala34
-rw-r--r--kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala61
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Incubator.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala37
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Sampler.scala3
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala66
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceExtension.scala91
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceLocal.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala79
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TracerExtension.scala94
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala30
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TracingContext.scala13
-rw-r--r--kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/util/ConfigTools.scala26
-rw-r--r--kamon-core/src/main/scala/kamon/util/FastDispatch.scala22
-rw-r--r--kamon-core/src/main/scala/kamon/util/MapMerge.scala27
-rw-r--r--kamon-core/src/main/scala/kamon/util/Timestamp.scala (renamed from kamon-core/src/main/scala/kamon/TimeUnits.scala)12
-rw-r--r--kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala18
-rw-r--r--kamon-core/src/test/resources/logback.xml24
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala63
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala63
-rw-r--r--kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala112
-rw-r--r--kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala65
-rw-r--r--kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala107
-rw-r--r--kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala296
-rw-r--r--kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala1
-rw-r--r--kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala66
-rw-r--r--kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala34
-rw-r--r--kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala22
-rw-r--r--kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala34
-rw-r--r--kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala76
-rw-r--r--kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala94
-rw-r--r--kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala23
-rw-r--r--kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala9
-rw-r--r--kamon-testkit/src/main/scala/testkit/AkkaExtensionSwap.scala (renamed from kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala)5
-rw-r--r--kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala4
66 files changed, 2127 insertions, 2001 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml
index 47ce11d8..854e9437 100644
--- a/kamon-core/src/main/resources/META-INF/aop.xml
+++ b/kamon-core/src/main/resources/META-INF/aop.xml
@@ -2,19 +2,13 @@
<aspectj>
<aspects>
+
<!-- Disable AspectJ Weaver not present error -->
<aspect name="kamon.instrumentation.AspectJWeaverMissingWarning"/>
- <!-- Futures -->
- <aspect name="kamon.instrumentation.scala.FutureInstrumentation"/>
- <aspect name="kamon.instrumentation.scalaz.FutureInstrumentation"/>
-
</aspects>
<weaver>
- <include within="scala.concurrent..*"/>
- <include within="scalaz.concurrent..*"/>
- <include within="spray..*"/>
<include within="kamon..*"/>
</weaver>
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 8f5a8b45..cd257ebe 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -3,19 +3,11 @@
# ================================== #
kamon {
-
- # Default dispatcher for all Kamon components, unless a more specific one is configured.
- default-dispatcher = "kamon.kamon-dispatcher"
-
- metrics {
+ metric {
# Time interval for collecting all metrics and send the snapshots to all subscribed actors.
tick-interval = 1 second
- # Time interval for recording values on all registered gauges.
- gauge-recording-interval = 100 milliseconds
-
-
# Default size for the LongBuffer that gets allocated for metrics collection and merge. The
# value should correspond to the highest number of different buckets with values that might
# exist in a single histogram during a metrics collection. The default value of 33792 is a
@@ -31,69 +23,79 @@ kamon {
# it might be ok for you to turn this error off.
disable-aspectj-weaver-missing-error = false
+ # Specify if entities that do not match any include/exclude filter should be tracked.
+ track-unmatched-entities = yes
- dispatchers {
-
- # Dispatcher for periodical gauge value recordings.
- gauge-recordings = ${kamon.default-dispatcher}
-
- # Dispatcher for subscriptions and metrics collection actors.
- metric-subscriptions = ${kamon.default-dispatcher}
- }
-
-
- filters = [
- {
- actor {
- includes = []
- excludes = [ "system/*", "user/IO-*" ]
- }
- },
- {
- router {
- includes = []
- excludes = [ "system/*", "user/IO-*" ]
- }
- },
- {
- trace {
- includes = [ "*" ]
- excludes = []
- }
- },
- {
- dispatcher {
- includes = [ "default-dispatcher" ]
- excludes = []
- }
+ filters {
+ trace {
+ includes = [ "**" ]
+ excludes = [ ]
}
- ]
+ }
- precision {
- default-histogram-precision {
+ # Default instrument settings for histograms, min max counters and gaugues. The actual settings to be used when
+ # creating a instrument is determined by merging the default settings, code settings and specific instrument
+ # settings using the following priorities (top wins):
+
+ # - any setting in `kamon.metric.instrument-settings` for the given category/instrument.
+ # - code settings provided when creating the instrument.
+ # - `default-instrument-settings`.
+ #
+ default-instrument-settings {
+ histogram {
+ precision = normal
+ lowest-discernible-value = 1
highest-trackable-value = 3600000000000
- significant-value-digits = 2
}
- default-min-max-counter-precision {
- refresh-interval = 100 milliseconds
+ min-max-counter {
+ precision = normal
+ lowest-discernible-value = 1
highest-trackable-value = 999999999
- significant-value-digits = 2
+ refresh-interval = 100 milliseconds
}
- default-gauge-precision {
+ gauge {
+ precision = normal
+ lowest-discernible-value = 1
+ highest-trackable-value = 3600000000000
refresh-interval = 100 milliseconds
- highest-trackable-value = 999999999
- significant-value-digits = 2
}
- trace {
- elapsed-time = ${kamon.metrics.precision.default-histogram-precision}
- segment = ${kamon.metrics.precision.default-histogram-precision}
- }
+ }
+
+ # Custom configurations for category instruments. The settings provided in this section will override the default
+ # and code instrument settings as explained in the `default-instrument-settings` key. There is no need to provide
+ # full instrument settings in this section, only the settings that should be overriden must be included. Example:
+ # if you wish to change the precision and lowest discernible value of the `elapsed-time` instrument for the `trace`
+ # category, you should include the following configuration in your application.conf file:
+ #
+ # kamon.metric.instrument-settings.trace {
+ # elapsed-time {
+ # precision = fine
+ # lowest-discernible-value = 1000
+ # }
+ # }
+ #
+ # In this example, the value for the `highest-trackable-value` setting will be either the code setting or the default
+ # setting, depending on how the `elapsed-time` metric is created.
+ instrument-settings {
+
+ }
+
+ dispatchers {
+
+ # Dispatcher for the actor that will collect all recorded metrics on every tick and dispatch them to all subscribers.
+ metric-collection = akka.actor.default-dispatcher
+
+ # Dispatcher for the Kamon refresh scheduler, used by all MinMaxCounters and Gaugues to update their values.
+ refresh-scheduler = akka.actor.default-dispatcher
}
}
+
+
+
trace {
# Level of detail used when recording trace information. The posible values are:
@@ -101,7 +103,7 @@ kamon {
# to the subscriptors of trace data.
# - simple-trace: metrics for all included traces and all segments are recorded and additionally a Trace message
# containing the trace and segments details and metadata.
- level = metrics-only
+ level-of-detail = metrics-only
# Sampling strategy to apply when the tracing level is set to `simple-trace`. The options are: all, random, ordered
# and threshold. The details of each sampler are bellow.
@@ -142,7 +144,7 @@ kamon {
}
# Default dispatcher for all trace module operations
- dispatcher = ${kamon.default-dispatcher}
+ dispatcher = "akka.actor.default-dispatcher"
}
kamon-dispatcher {
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 00026b77..f07f846b 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -16,9 +16,26 @@ package kamon
import _root_.akka.actor
import _root_.akka.actor._
+import com.typesafe.config.Config
+import kamon.metric._
+import kamon.trace.{ Tracer, TracerExtension }
+
+class Kamon(val actorSystem: ActorSystem) {
+ val metrics: MetricsExtension = Metrics.get(actorSystem)
+ val tracer: TracerExtension = Tracer.get(actorSystem)
+ val userMetrics: UserMetricsExtension = UserMetrics.get(actorSystem)
+}
object Kamon {
trait Extension extends actor.Extension
def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): T = key(system)
-}
+ def apply(actorSystemName: String): Kamon =
+ apply(ActorSystem(actorSystemName))
+
+ def apply(actorSystemName: String, config: Config): Kamon =
+ apply(ActorSystem(actorSystemName, config))
+
+ def apply(system: ActorSystem): Kamon =
+ new Kamon(system)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/ModuleSupervisor.scala b/kamon-core/src/main/scala/kamon/ModuleSupervisor.scala
new file mode 100644
index 00000000..99d87719
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/ModuleSupervisor.scala
@@ -0,0 +1,48 @@
+package kamon
+
+import _root_.akka.actor
+import _root_.akka.actor._
+import kamon.ModuleSupervisor.CreateModule
+
+import scala.concurrent.{ Future, Promise }
+import scala.util.Success
+
+object ModuleSupervisor extends ExtensionId[ModuleSupervisorExtension] with ExtensionIdProvider {
+
+ def lookup(): ExtensionId[_ <: actor.Extension] = ModuleSupervisor
+ def createExtension(system: ExtendedActorSystem): ModuleSupervisorExtension = new ModuleSupervisorExtensionImpl(system)
+
+ case class CreateModule(name: String, props: Props, childPromise: Promise[ActorRef])
+}
+
+trait ModuleSupervisorExtension extends actor.Extension {
+ def createModule(name: String, props: Props): Future[ActorRef]
+}
+
+class ModuleSupervisorExtensionImpl(system: ExtendedActorSystem) extends ModuleSupervisorExtension {
+ import system.dispatcher
+ private lazy val supervisor = system.actorOf(Props[ModuleSupervisor], "kamon")
+
+ def createModule(name: String, props: Props): Future[ActorRef] = Future {} flatMap { _: Unit ⇒
+ val modulePromise = Promise[ActorRef]()
+ supervisor ! CreateModule(name, props, modulePromise)
+ modulePromise.future
+ }
+}
+
+class ModuleSupervisor extends Actor with ActorLogging {
+
+ def receive = {
+ case CreateModule(name, props, childPromise) ⇒ createChildModule(name, props, childPromise)
+ }
+
+ def createChildModule(name: String, props: Props, childPromise: Promise[ActorRef]): Unit = {
+ context.child(name).map { alreadyAvailableModule ⇒
+ log.warning("Received a request to create module [{}] but the module is already available, returning the existent one.")
+ childPromise.complete(Success(alreadyAvailableModule))
+
+ } getOrElse {
+ childPromise.complete(Success(context.actorOf(props, name)))
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala
index 0dd189f6..22f54ab0 100644
--- a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala
@@ -1,99 +1,25 @@
package kamon.http
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-import kamon.metric.instrument.Counter
-import kamon.metric._
-
-import scala.collection.concurrent.TrieMap
-
-object HttpServerMetrics extends MetricGroupIdentity {
- import Metrics.AtomicGetOrElseUpdateForTriemap
-
- val name: String = "http-server-metrics-recorder"
- val category = new MetricGroupCategory {
- val name: String = "http-server"
- }
-
- type TraceName = String
- type StatusCode = String
-
- case class CountPerStatusCode(statusCode: String) extends MetricIdentity {
- def name: String = statusCode
- }
-
- case class TraceCountPerStatus(traceName: TraceName, statusCode: StatusCode) extends MetricIdentity {
- def name: String = traceName + "_" + statusCode
- }
-
- class HttpServerMetricsRecorder extends MetricGroupRecorder {
-
- private val counters = TrieMap[StatusCode, Counter]()
- private val countersPerTrace = TrieMap[TraceName, TrieMap[StatusCode, Counter]]()
-
- def recordResponse(statusCode: StatusCode): Unit = recordResponse(statusCode, 1L)
-
- def recordResponse(statusCode: StatusCode, count: Long): Unit =
- counters.atomicGetOrElseUpdate(statusCode, Counter()).increment(count)
-
- def recordResponse(traceName: TraceName, statusCode: StatusCode): Unit = recordResponse(traceName, statusCode, 1L)
-
- def recordResponse(traceName: TraceName, statusCode: StatusCode, count: Long): Unit = {
- recordResponse(statusCode, count)
- countersPerTrace.atomicGetOrElseUpdate(traceName, TrieMap()).atomicGetOrElseUpdate(statusCode, Counter()).increment(count)
- }
-
- def collect(context: CollectionContext): HttpServerMetricsSnapshot = {
- val countsPerStatusCode = counters.map {
- case (statusCode, counter) ⇒ (statusCode, counter.collect(context))
- }.toMap
-
- val countsPerTraceAndStatus = countersPerTrace.map {
- case (traceName, countsPerStatus) ⇒
- (traceName, countsPerStatus.map { case (statusCode, counter) ⇒ (statusCode, counter.collect(context)) }.toMap)
- }.toMap
-
- HttpServerMetricsSnapshot(countsPerStatusCode, countsPerTraceAndStatus)
- }
-
- def cleanup: Unit = {}
+import kamon.metric.{ EntityRecorderFactory, GenericEntityRecorder }
+import kamon.metric.instrument.InstrumentFactory
+
+/**
+ * Counts HTTP response status codes into per status code and per trace name + status counters. If recording a HTTP
+ * response with status 500 for the trace "GetUser", the counter with name "500" as well as the counter with name
+ * "GetUser_500" will be incremented.
+ */
+class HttpServerMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
+
+ def recordResponse(statusCode: String): Unit =
+ counter(statusCode).increment()
+
+ def recordResponse(traceName: String, statusCode: String): Unit = {
+ recordResponse(statusCode)
+ counter(traceName + "_" + statusCode).increment()
}
-
- case class HttpServerMetricsSnapshot(countsPerStatusCode: Map[StatusCode, Counter.Snapshot],
- countsPerTraceAndStatusCode: Map[TraceName, Map[StatusCode, Counter.Snapshot]]) extends MetricGroupSnapshot {
-
- type GroupSnapshotType = HttpServerMetricsSnapshot
-
- def merge(that: HttpServerMetricsSnapshot, context: CollectionContext): HttpServerMetricsSnapshot = {
- val combinedCountsPerStatus = combineMaps(countsPerStatusCode, that.countsPerStatusCode)((l, r) ⇒ l.merge(r, context))
- val combinedCountsPerTraceAndStatus = combineMaps(countsPerTraceAndStatusCode, that.countsPerTraceAndStatusCode) {
- (leftCounts, rightCounts) ⇒ combineMaps(leftCounts, rightCounts)((l, r) ⇒ l.merge(r, context))
- }
- HttpServerMetricsSnapshot(combinedCountsPerStatus, combinedCountsPerTraceAndStatus)
- }
-
- def metrics: Map[MetricIdentity, MetricSnapshot] = {
- countsPerStatusCode.map {
- case (statusCode, count) ⇒ (CountPerStatusCode(statusCode), count)
- } ++ {
- for (
- (traceName, countsPerStatus) ← countsPerTraceAndStatusCode;
- (statusCode, count) ← countsPerStatus
- ) yield (TraceCountPerStatus(traceName, statusCode), count)
- }
- }
- }
-
- val Factory = HttpServerMetricGroupFactory
}
-case object HttpServerMetricGroupFactory extends MetricGroupFactory {
-
- import HttpServerMetrics._
-
- type GroupRecorder = HttpServerMetricsRecorder
-
- def create(config: Config, system: ActorSystem): HttpServerMetricsRecorder =
- new HttpServerMetricsRecorder()
-
-} \ No newline at end of file
+object HttpServerMetrics extends EntityRecorderFactory[HttpServerMetrics] {
+ def category: String = "http-server"
+ def createRecorder(instrumentFactory: InstrumentFactory): HttpServerMetrics = new HttpServerMetrics(instrumentFactory)
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala
deleted file mode 100644
index bda2da78..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.instrumentation.scala
-
-import kamon.trace.{ TraceContextAware, TraceRecorder }
-import org.aspectj.lang.ProceedingJoinPoint
-import org.aspectj.lang.annotation._
-
-@Aspect
-class FutureInstrumentation {
-
- @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
- def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default
-
- @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)")
- def futureRelatedRunnableCreation(runnable: TraceContextAware): Unit = {}
-
- @After("futureRelatedRunnableCreation(runnable)")
- def afterCreation(runnable: TraceContextAware): Unit = {
- // Force traceContext initialization.
- runnable.traceContext
- }
-
- @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)")
- def futureRelatedRunnableExecution(runnable: TraceContextAware) = {}
-
- @Around("futureRelatedRunnableExecution(runnable)")
- def aroundExecution(pjp: ProceedingJoinPoint, runnable: TraceContextAware): Any = {
- TraceRecorder.withInlineTraceContextReplacement(runnable.traceContext) {
- pjp.proceed()
- }
- }
-
-}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala
deleted file mode 100644
index 65caaa8f..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/scalaz/FutureInstrumentation.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.instrumentation.scalaz
-
-import kamon.trace.{ TraceContextAware, TraceRecorder }
-import org.aspectj.lang.ProceedingJoinPoint
-import org.aspectj.lang.annotation._
-
-@Aspect
-class FutureInstrumentation {
-
- @DeclareMixin("scalaz.concurrent..* && java.util.concurrent.Callable+")
- def mixinTraceContextAwareToFutureRelatedCallable: TraceContextAware =
- TraceContextAware.default
-
- @Pointcut("execution((scalaz.concurrent..* && java.util.concurrent.Callable+).new(..)) && this(callable)")
- def futureRelatedCallableCreation(callable: TraceContextAware): Unit = {}
-
- @After("futureRelatedCallableCreation(callable)")
- def afterCreation(callable: TraceContextAware): Unit =
- // Force traceContext initialization.
- callable.traceContext
-
- @Pointcut("execution(* (scalaz.concurrent..* && java.util.concurrent.Callable+).call()) && this(callable)")
- def futureRelatedCallableExecution(callable: TraceContextAware): Unit = {}
-
- @Around("futureRelatedCallableExecution(callable)")
- def aroundExecution(pjp: ProceedingJoinPoint, callable: TraceContextAware): Any =
- TraceRecorder.withInlineTraceContextReplacement(callable.traceContext) {
- pjp.proceed()
- }
-
-}
diff --git a/kamon-core/src/main/scala/kamon/metric/Entity.scala b/kamon-core/src/main/scala/kamon/metric/Entity.scala
new file mode 100644
index 00000000..962626e0
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/Entity.scala
@@ -0,0 +1,52 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+/**
+ * Identify a `thing` that is being monitored by Kamon. A [[kamon.metric.Entity]] is used to identify tracked `things`
+ * in both the metrics recording and reporting sides. Only the name and category fields are used with determining
+ * equality between two entities.
+ *
+ * // TODO: Find a better word for `thing`.
+ */
+class Entity(val name: String, val category: String, val metadata: Map[String, String]) {
+
+ override def equals(o: Any): Boolean = {
+ if (this eq o.asInstanceOf[AnyRef])
+ true
+ else if ((o.asInstanceOf[AnyRef] eq null) || !o.isInstanceOf[Entity])
+ false
+ else {
+ val thatAsEntity = o.asInstanceOf[Entity]
+ category == thatAsEntity.category && name == thatAsEntity.name
+ }
+ }
+
+ override def hashCode: Int = {
+ var result: Int = name.hashCode
+ result = 31 * result + category.hashCode
+ return result
+ }
+}
+
+object Entity {
+ def apply(name: String, category: String): Entity =
+ apply(name, category, Map.empty)
+
+ def apply(name: String, category: String, metadata: Map[String, String]): Entity =
+ new Entity(name, category, metadata)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala
deleted file mode 100644
index 3761f5a5..00000000
--- a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metric
-
-import java.nio.{ LongBuffer }
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-
-trait MetricGroupCategory {
- def name: String
-}
-
-trait MetricGroupIdentity {
- def name: String
- def category: MetricGroupCategory
-}
-
-trait MetricIdentity {
- def name: String
-}
-
-trait CollectionContext {
- def buffer: LongBuffer
-}
-
-object CollectionContext {
- def apply(longBufferSize: Int): CollectionContext = new CollectionContext {
- val buffer: LongBuffer = LongBuffer.allocate(longBufferSize)
- }
-}
-
-trait MetricGroupRecorder {
- def collect(context: CollectionContext): MetricGroupSnapshot
- def cleanup: Unit
-}
-
-trait MetricSnapshot {
- type SnapshotType
-
- def merge(that: SnapshotType, context: CollectionContext): SnapshotType
-}
-
-trait MetricGroupSnapshot {
- type GroupSnapshotType
-
- def metrics: Map[MetricIdentity, MetricSnapshot]
- def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType
-}
-
-private[kamon] trait MetricRecorder {
- type SnapshotType <: MetricSnapshot
-
- def collect(context: CollectionContext): SnapshotType
- def cleanup: Unit
-}
-
-trait MetricGroupFactory {
- type GroupRecorder <: MetricGroupRecorder
- def create(config: Config, system: ActorSystem): GroupRecorder
-}
-
diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
new file mode 100644
index 00000000..7a1972f0
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
@@ -0,0 +1,157 @@
+package kamon.metric
+
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
+import kamon.metric.instrument._
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration.FiniteDuration
+
+trait EntityRecorder {
+ def collect(collectionContext: CollectionContext): EntitySnapshot
+ def cleanup: Unit
+}
+
+trait EntityRecorderFactory[T <: EntityRecorder] {
+ def category: String
+ def createRecorder(instrumentFactory: InstrumentFactory): T
+}
+
+abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) extends EntityRecorder {
+ import kamon.util.TriemapAtomicGetOrElseUpdate.Syntax
+
+ private val _instruments = TrieMap.empty[MetricKey, Instrument]
+ private def register[T <: Instrument](key: MetricKey, instrument: ⇒ T): T =
+ _instruments.atomicGetOrElseUpdate(key, instrument).asInstanceOf[T]
+
+ protected def histogram(name: String): Histogram =
+ register(HistogramKey(name), instrumentFactory.createHistogram(name))
+
+ protected def histogram(name: String, dynamicRange: DynamicRange): Histogram =
+ register(HistogramKey(name), instrumentFactory.createHistogram(name, Some(dynamicRange)))
+
+ protected def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram =
+ register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name))
+
+ protected def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram =
+ register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name, Some(dynamicRange)))
+
+ protected def histogram(key: HistogramKey): Histogram =
+ register(key, instrumentFactory.createHistogram(key.name))
+
+ protected def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram =
+ register(key, instrumentFactory.createHistogram(key.name, Some(dynamicRange)))
+
+ protected def removeHistogram(name: String): Unit =
+ _instruments.remove(HistogramKey(name))
+
+ protected def removeHistogram(key: HistogramKey): Unit =
+ _instruments.remove(key)
+
+ protected def minMaxCounter(name: String): MinMaxCounter =
+ register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name))
+
+ protected def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter =
+ register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange)))
+
+ protected def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter =
+ register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval)))
+
+ protected def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name))
+
+ protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter =
+ register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval)))
+
+ protected def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange)))
+
+ protected def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval)))
+
+ protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval)))
+
+ protected def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter =
+ register(key, instrumentFactory.createMinMaxCounter(key.name))
+
+ protected def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter =
+ register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange)))
+
+ protected def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter =
+ register(key, instrumentFactory.createMinMaxCounter(key.name, refreshInterval = Some(refreshInterval)))
+
+ protected def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter =
+ register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange), Some(refreshInterval)))
+
+ protected def removeMinMaxCounter(name: String): Unit =
+ _instruments.remove(MinMaxCounterKey(name))
+
+ protected def removeMinMaxCounter(key: MinMaxCounterKey): Unit =
+ _instruments.remove(key)
+
+ protected def gauge(name: String, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name), instrumentFactory.createGauge(name, valueCollector = valueCollector))
+
+ protected def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector))
+
+ protected def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, valueCollector = valueCollector))
+
+ protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector))
+
+ protected def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector))
+
+ protected def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge =
+ register(key, instrumentFactory.createGauge(key.name, valueCollector = valueCollector))
+
+ protected def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge =
+ register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), valueCollector = valueCollector))
+
+ protected def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ register(key, instrumentFactory.createGauge(key.name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def removeGauge(name: String): Unit =
+ _instruments.remove(GaugeKey(name))
+
+ protected def removeGauge(key: GaugeKey): Unit =
+ _instruments.remove(key)
+
+ protected def counter(name: String): Counter =
+ register(CounterKey(name), instrumentFactory.createCounter())
+
+ protected def counter(key: CounterKey): Counter =
+ register(key, instrumentFactory.createCounter())
+
+ protected def removeCounter(name: String): Unit =
+ _instruments.remove(CounterKey(name))
+
+ protected def removeCounter(key: CounterKey): Unit =
+ _instruments.remove(key)
+
+ def collect(collectionContext: CollectionContext): EntitySnapshot = {
+ val snapshots = Map.newBuilder[MetricKey, InstrumentSnapshot]
+ _instruments.foreach {
+ case (key, instrument) ⇒ snapshots += key -> instrument.collect(collectionContext)
+ }
+
+ new DefaultEntitySnapshot(snapshots.result())
+ }
+
+ def cleanup: Unit = _instruments.values.foreach(_.cleanup)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala
new file mode 100644
index 00000000..17c8f4c5
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala
@@ -0,0 +1,47 @@
+package kamon.metric
+
+import kamon.metric.instrument.{ Counter, Histogram, CollectionContext, InstrumentSnapshot }
+import kamon.util.MapMerge
+import scala.reflect.ClassTag
+
+trait EntitySnapshot {
+ def metrics: Map[MetricKey, InstrumentSnapshot]
+ def merge(that: EntitySnapshot, collectionContext: CollectionContext): EntitySnapshot
+
+ def histogram(name: String): Option[Histogram.Snapshot] =
+ find[HistogramKey, Histogram.Snapshot](name)
+
+ def minMaxCounter(name: String): Option[Histogram.Snapshot] =
+ find[MinMaxCounterKey, Histogram.Snapshot](name)
+
+ def gauge(name: String): Option[Histogram.Snapshot] =
+ find[GaugeKey, Histogram.Snapshot](name)
+
+ def counter(name: String): Option[Counter.Snapshot] =
+ find[CounterKey, Counter.Snapshot](name)
+
+ def histograms: Map[HistogramKey, Histogram.Snapshot] =
+ filterByType[HistogramKey, Histogram.Snapshot]
+
+ def minMaxCounters: Map[MinMaxCounterKey, Histogram.Snapshot] =
+ filterByType[MinMaxCounterKey, Histogram.Snapshot]
+
+ def gauges: Map[GaugeKey, Histogram.Snapshot] =
+ filterByType[GaugeKey, Histogram.Snapshot]
+
+ def counters: Map[CounterKey, Counter.Snapshot] =
+ filterByType[CounterKey, Counter.Snapshot]
+
+ private def filterByType[K <: MetricKey, V <: InstrumentSnapshot](implicit keyCT: ClassTag[K]): Map[K, V] =
+ metrics.collect { case (k, v) if keyCT.runtimeClass.isInstance(k) ⇒ (k.asInstanceOf[K], v.asInstanceOf[V]) }
+
+ private def find[K <: MetricKey, V <: InstrumentSnapshot](name: String)(implicit keyCT: ClassTag[K]) =
+ metrics.find { case (k, v) ⇒ keyCT.runtimeClass.isInstance(k) && k.name == name } map (_._2.asInstanceOf[V])
+}
+
+class DefaultEntitySnapshot(val metrics: Map[MetricKey, InstrumentSnapshot]) extends EntitySnapshot {
+ import MapMerge.Syntax
+
+ override def merge(that: EntitySnapshot, collectionContext: CollectionContext): EntitySnapshot =
+ new DefaultEntitySnapshot(metrics.merge(that.metrics, (l, r) ⇒ l.merge(r, collectionContext)))
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricKey.scala b/kamon-core/src/main/scala/kamon/metric/MetricKey.scala
new file mode 100644
index 00000000..a17972df
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricKey.scala
@@ -0,0 +1,153 @@
+package kamon.metric
+
+import kamon.metric.instrument.{ InstrumentTypes, InstrumentType, UnitOfMeasurement }
+
+/**
+ * MetricKeys are used to identify a given metric in entity recorders and snapshots. MetricKeys can be used to encode
+ * additional metadata for a metric being recorded, as well as the unit of measurement of the data being recorder.
+ */
+sealed trait MetricKey {
+ def name: String
+ def unitOfMeasurement: UnitOfMeasurement
+ def instrumentType: InstrumentType
+ def metadata: Map[String, String]
+}
+
+// Wish that there was a shorter way to describe the operations bellow, but apparently there is no way to generalize all
+// the apply/create versions that would produce the desired return types when used from Java.
+
+/**
+ * MetricKey for all Histogram-based metrics.
+ */
+case class HistogramKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey {
+ val instrumentType = InstrumentTypes.Histogram
+}
+
+object HistogramKey {
+ def apply(name: String): HistogramKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def apply(name: String, unitOfMeasurement: UnitOfMeasurement): HistogramKey =
+ apply(name, unitOfMeasurement, Map.empty)
+
+ def apply(name: String, metadata: Map[String, String]): HistogramKey =
+ apply(name, UnitOfMeasurement.Unknown, Map.empty)
+
+ /**
+ * Java friendly versions:
+ */
+
+ def create(name: String): HistogramKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement): HistogramKey =
+ apply(name, unitOfMeasurement)
+
+ def create(name: String, metadata: Map[String, String]): HistogramKey =
+ apply(name, metadata)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): HistogramKey =
+ apply(name, unitOfMeasurement, metadata)
+}
+
+/**
+ * MetricKey for all MinMaxCounter-based metrics.
+ */
+case class MinMaxCounterKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey {
+ val instrumentType = InstrumentTypes.MinMaxCounter
+}
+
+object MinMaxCounterKey {
+ def apply(name: String): MinMaxCounterKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def apply(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounterKey =
+ apply(name, unitOfMeasurement, Map.empty)
+
+ def apply(name: String, metadata: Map[String, String]): MinMaxCounterKey =
+ apply(name, UnitOfMeasurement.Unknown, Map.empty)
+
+ /**
+ * Java friendly versions:
+ */
+
+ def create(name: String): MinMaxCounterKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounterKey =
+ apply(name, unitOfMeasurement)
+
+ def create(name: String, metadata: Map[String, String]): MinMaxCounterKey =
+ apply(name, metadata)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): MinMaxCounterKey =
+ apply(name, unitOfMeasurement, metadata)
+}
+
+/**
+ * MetricKey for all Gauge-based metrics.
+ */
+case class GaugeKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey {
+ val instrumentType = InstrumentTypes.Gauge
+}
+
+object GaugeKey {
+ def apply(name: String): GaugeKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def apply(name: String, unitOfMeasurement: UnitOfMeasurement): GaugeKey =
+ apply(name, unitOfMeasurement, Map.empty)
+
+ def apply(name: String, metadata: Map[String, String]): GaugeKey =
+ apply(name, UnitOfMeasurement.Unknown, Map.empty)
+
+ /**
+ * Java friendly versions:
+ */
+
+ def create(name: String): GaugeKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement): GaugeKey =
+ apply(name, unitOfMeasurement)
+
+ def create(name: String, metadata: Map[String, String]): GaugeKey =
+ apply(name, metadata)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): GaugeKey =
+ apply(name, unitOfMeasurement, metadata)
+}
+
+/**
+ * MetricKey for all Counter-based metrics.
+ */
+case class CounterKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey {
+ val instrumentType = InstrumentTypes.Counter
+}
+
+object CounterKey {
+ def apply(name: String): CounterKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def apply(name: String, unitOfMeasurement: UnitOfMeasurement): CounterKey =
+ apply(name, unitOfMeasurement, Map.empty)
+
+ def apply(name: String, metadata: Map[String, String]): CounterKey =
+ apply(name, UnitOfMeasurement.Unknown, Map.empty)
+
+ /**
+ * Java friendly versions:
+ */
+
+ def create(name: String): CounterKey =
+ apply(name, UnitOfMeasurement.Unknown)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement): CounterKey =
+ apply(name, unitOfMeasurement)
+
+ def create(name: String, metadata: Map[String, String]): CounterKey =
+ apply(name, metadata)
+
+ def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): CounterKey =
+ apply(name, unitOfMeasurement, metadata)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
index ed55ab06..b738eeb9 100644
--- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
@@ -16,91 +16,119 @@
package kamon.metric
-import akka.event.Logging.Error
-import akka.event.EventStream
+import akka.actor
+import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe }
+import kamon.{ ModuleSupervisor, Kamon }
+import kamon.metric.instrument.{ InstrumentFactory, CollectionContext }
import scala.collection.concurrent.TrieMap
import akka.actor._
-import com.typesafe.config.Config
-import kamon.util.GlobPathFilter
-import kamon.Kamon
-import akka.actor
-import kamon.metric.Metrics.MetricGroupFilter
-import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe }
-import java.util.concurrent.TimeUnit
+import kamon.util.{ FastDispatch, TriemapAtomicGetOrElseUpdate }
-class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- import Metrics.AtomicGetOrElseUpdateForTriemap
+object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider {
+ override def get(system: ActorSystem): MetricsExtension = super.get(system)
+ def lookup(): ExtensionId[_ <: actor.Extension] = Metrics
+ def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtensionImpl(system)
+}
- val metricsExtConfig = system.settings.config.getConfig("kamon.metrics")
- printInitializationMessage(system.eventStream, metricsExtConfig.getBoolean("disable-aspectj-weaver-missing-error"))
+case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T)
- /** Configured Dispatchers */
- val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions"))
- val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings"))
+trait MetricsExtension extends Kamon.Extension {
+ def settings: MetricsExtensionSettings
+ def shouldTrack(entity: Entity): Boolean
+ def shouldTrack(entityName: String, category: String): Boolean =
+ shouldTrack(Entity(entityName, category))
- /** Configuration Settings */
- val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS)
+ def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]]
+ def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T]
+ def unregister(entity: Entity): Unit
- val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]()
- val filters = loadFilters(metricsExtConfig)
- lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions")
+ def find(entity: Entity): Option[EntityRecorder]
+ def find(name: String, category: String): Option[EntityRecorder]
- def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = {
- if (shouldTrack(identity))
- Some(storage.atomicGetOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder])
- else
- None
- }
+ def subscribe(filter: SubscriptionFilter, subscriber: ActorRef): Unit =
+ subscribe(filter, subscriber, permanently = false)
- def unregister(identity: MetricGroupIdentity): Unit = {
- storage.remove(identity).map(_.cleanup)
- }
+ def subscribe(category: String, selection: String, subscriber: ActorRef, permanently: Boolean): Unit =
+ subscribe(SubscriptionFilter(category, selection), subscriber, permanently)
- def subscribe[C <: MetricGroupCategory](category: C, selection: String, subscriber: ActorRef, permanently: Boolean = false): Unit =
- subscriptions.tell(Subscribe(category, selection, subscriber, permanently), subscriber)
+ def subscribe(category: String, selection: String, subscriber: ActorRef): Unit =
+ subscribe(SubscriptionFilter(category, selection), subscriber, permanently = false)
- def unsubscribe(subscriber: ActorRef): Unit =
- subscriptions.tell(Unsubscribe(subscriber), subscriber)
+ def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean): Unit
- def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = {
- import scala.concurrent.duration._
+ def unsubscribe(subscriber: ActorRef): Unit
+ def buildDefaultCollectionContext: CollectionContext
+ def instrumentFactory(category: String): InstrumentFactory
+}
- system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) {
- body
- }(gaugeRecordingsDispatcher)
- }
+class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension {
+ import FastDispatch.Syntax
- private def shouldTrack(identity: MetricGroupIdentity): Boolean = {
- filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true)
- }
+ val settings = MetricsExtensionSettings(system)
- def loadFilters(config: Config): Map[String, MetricGroupFilter] = {
- import scala.collection.JavaConverters._
+ private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder]
+ private val _collectionContext = buildDefaultCollectionContext
+ private val _metricsCollectionDispatcher = system.dispatchers.lookup(settings.metricCollectionDispatcher)
+ private val _subscriptions = ModuleSupervisor.get(system).createModule("subscriptions-dispatcher",
+ SubscriptionsDispatcher.props(settings.tickInterval, collectSnapshots).withDispatcher(settings.metricCollectionDispatcher))
- val filters = config.getObjectList("filters").asScala
+ def shouldTrack(entity: Entity): Boolean =
+ settings.entityFilters.get(entity.category).map {
+ filter ⇒ filter.accept(entity.name)
- val allFilters =
- for (
- filter ← filters;
- entry ← filter.entrySet().asScala
- ) yield {
- val key = entry.getKey
- val keyBasedConfig = entry.getValue.atKey(key)
+ } getOrElse (settings.trackUnmatchedEntities)
- val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList
- val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList
+ def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] = {
+ import TriemapAtomicGetOrElseUpdate.Syntax
+ val entity = Entity(entityName, recorderFactory.category)
- (key, MetricGroupFilter(includes, excludes))
- }
+ if (shouldTrack(entity)) {
+ val instrumentFactory = settings.instrumentFactories.get(recorderFactory.category).getOrElse(settings.defaultInstrumentFactory)
+ val recorder = _trackedEntities.atomicGetOrElseUpdate(entity, recorderFactory.createRecorder(instrumentFactory)).asInstanceOf[T]
+ Some(EntityRegistration(entity, recorder))
+ } else None
+ }
- allFilters.toMap
+ def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] = {
+ import TriemapAtomicGetOrElseUpdate.Syntax
+ EntityRegistration(entity, _trackedEntities.atomicGetOrElseUpdate(entity, recorder).asInstanceOf[T])
}
+ def unregister(entity: Entity): Unit =
+ _trackedEntities.remove(entity).map(_.cleanup)
+
+ def find(entity: Entity): Option[EntityRecorder] =
+ _trackedEntities.get(entity)
+
+ def find(name: String, category: String): Option[EntityRecorder] =
+ find(Entity(name, category))
+
+ def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit =
+ _subscriptions.fastDispatch(Subscribe(filter, subscriber, permanent))(_metricsCollectionDispatcher)
+
+ def unsubscribe(subscriber: ActorRef): Unit =
+ _subscriptions.fastDispatch(Unsubscribe(subscriber))(_metricsCollectionDispatcher)
+
def buildDefaultCollectionContext: CollectionContext =
- CollectionContext(metricsExtConfig.getInt("default-collection-context-buffer-size"))
+ CollectionContext(settings.defaultCollectionContextBufferSize)
+
+ def instrumentFactory(category: String): InstrumentFactory =
+ settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory)
+
+ /**
+ * Collect and dispatch.
+ */
+ private def collectSnapshots(): Map[Entity, EntitySnapshot] = {
+ val builder = Map.newBuilder[Entity, EntitySnapshot]
+ _trackedEntities.foreach {
+ case (identity, recorder) ⇒ builder += ((identity, recorder.collect(_collectionContext)))
+ }
- def printInitializationMessage(eventStream: EventStream, disableWeaverMissingError: Boolean): Unit = {
+ builder.result()
+ }
+
+ /* def printInitializationMessage(eventStream: EventStream, disableWeaverMissingError: Boolean): Unit = {
if (!disableWeaverMissingError) {
val weaverMissingMessage =
"""
@@ -123,22 +151,6 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
eventStream.publish(Error("MetricsExtension", classOf[MetricsExtension], weaverMissingMessage))
}
- }
+ }*/
}
-object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider {
- def lookup(): ExtensionId[_ <: actor.Extension] = Metrics
- def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system)
-
- case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) {
- def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
- }
-
- implicit class AtomicGetOrElseUpdateForTriemap[K, V](trieMap: TrieMap[K, V]) {
- def atomicGetOrElseUpdate(key: K, op: ⇒ V): V =
- trieMap.get(key) match {
- case Some(v) ⇒ v
- case None ⇒ val d = op; trieMap.putIfAbsent(key, d).getOrElse(d)
- }
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala
new file mode 100644
index 00000000..ca1db850
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala
@@ -0,0 +1,100 @@
+package kamon.metric
+
+import akka.actor.ExtendedActorSystem
+import com.typesafe.config.Config
+import kamon.metric.instrument.{ RefreshScheduler, InstrumentFactory, DefaultInstrumentSettings, InstrumentCustomSettings }
+import kamon.util.GlobPathFilter
+
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * Configuration settings for the Metrics extension, as read from the `kamon.metric` configuration key.
+ */
+case class MetricsExtensionSettings(
+ tickInterval: FiniteDuration,
+ defaultCollectionContextBufferSize: Int,
+ trackUnmatchedEntities: Boolean,
+ entityFilters: Map[String, EntityFilter],
+ instrumentFactories: Map[String, InstrumentFactory],
+ defaultInstrumentFactory: InstrumentFactory,
+ metricCollectionDispatcher: String,
+ refreshSchedulerDispatcher: String,
+ refreshScheduler: RefreshScheduler)
+
+/**
+ *
+ */
+case class EntityFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) {
+ def accept(name: String): Boolean =
+ includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
+}
+
+object MetricsExtensionSettings {
+ import kamon.util.ConfigTools.Syntax
+ import scala.concurrent.duration._
+
+ def apply(system: ExtendedActorSystem): MetricsExtensionSettings = {
+ val metricConfig = system.settings.config.getConfig("kamon.metric")
+
+ val tickInterval = metricConfig.getFiniteDuration("tick-interval")
+ val collectBufferSize = metricConfig.getInt("default-collection-context-buffer-size")
+ val trackUnmatchedEntities = metricConfig.getBoolean("track-unmatched-entities")
+ val entityFilters = loadFilters(metricConfig.getConfig("filters"))
+ val defaultInstrumentSettings = DefaultInstrumentSettings.fromConfig(metricConfig.getConfig("default-instrument-settings"))
+ val metricCollectionDispatcher = metricConfig.getString("dispatchers.metric-collection")
+ val refreshSchedulerDispatcher = metricConfig.getString("dispatchers.refresh-scheduler")
+
+ val refreshScheduler = RefreshScheduler(system.scheduler, system.dispatchers.lookup(refreshSchedulerDispatcher))
+ val instrumentFactories = loadInstrumentFactories(metricConfig.getConfig("instrument-settings"), defaultInstrumentSettings, refreshScheduler)
+ val defaultInstrumentFactory = new InstrumentFactory(Map.empty, defaultInstrumentSettings, refreshScheduler)
+
+ MetricsExtensionSettings(tickInterval, collectBufferSize, trackUnmatchedEntities, entityFilters, instrumentFactories,
+ defaultInstrumentFactory, metricCollectionDispatcher, refreshSchedulerDispatcher, refreshScheduler)
+ }
+
+ /**
+ * Load all the default filters configured under the `kamon.metric.filters` configuration key. All filters are
+ * defined with the entity category as a sub-key of the `kamon.metric.filters` key and two sub-keys to it: includes
+ * and excludes with lists of string glob patterns as values. Example:
+ *
+ * {{{
+ *
+ * kamon.metrics.filters {
+ * actor {
+ * includes = ["user/test-actor", "user/service/worker-*"]
+ * excludes = ["user/IO-*"]
+ * }
+ * }
+ *
+ * }}}
+ *
+ * @return a Map from category name to corresponding entity filter.
+ */
+ def loadFilters(filtersConfig: Config): Map[String, EntityFilter] = {
+ import scala.collection.JavaConverters._
+
+ filtersConfig.firstLevelKeys map { category: String ⇒
+ val includes = filtersConfig.getStringList(s"$category.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList
+ val excludes = filtersConfig.getStringList(s"$category.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList
+
+ (category, EntityFilter(includes, excludes))
+ } toMap
+ }
+
+ /**
+ * Load any custom configuration settings defined under the `kamon.metric.instrument-settings` configuration key and
+ * create InstrumentFactories for them.
+ *
+ * @return a Map from category name to InstrumentFactory.
+ */
+ def loadInstrumentFactories(instrumentSettings: Config, defaults: DefaultInstrumentSettings, refreshScheduler: RefreshScheduler): Map[String, InstrumentFactory] = {
+ instrumentSettings.firstLevelKeys.map { category ⇒
+ val categoryConfig = instrumentSettings.getConfig(category)
+ val customSettings = categoryConfig.firstLevelKeys.map { instrumentName ⇒
+ (instrumentName, InstrumentCustomSettings.fromConfig(categoryConfig.getConfig(instrumentName)))
+ } toMap
+
+ (category, new InstrumentFactory(customSettings, defaults, refreshScheduler))
+ } toMap
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/Scale.scala b/kamon-core/src/main/scala/kamon/metric/Scale.scala
deleted file mode 100644
index 2f27c1a3..00000000
--- a/kamon-core/src/main/scala/kamon/metric/Scale.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metric
-
-class Scale(val numericValue: Double) extends AnyVal
-
-object Scale {
- val Nano = new Scale(1E-9)
- val Micro = new Scale(1E-6)
- val Milli = new Scale(1E-3)
- val Unit = new Scale(1)
- val Kilo = new Scale(1E3)
- val Mega = new Scale(1E6)
- val Giga = new Scale(1E9)
-
- def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue
-}
diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
deleted file mode 100644
index a22e1c21..00000000
--- a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metric
-
-import akka.actor._
-import kamon.metric.Subscriptions._
-import kamon.util.GlobPathFilter
-import scala.concurrent.duration.{ FiniteDuration, Duration }
-import java.util.concurrent.TimeUnit
-import kamon.{ MilliTimestamp, Kamon }
-import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer
-
-class Subscriptions extends Actor {
- import context.system
-
- val flushMetricsSchedule = scheduleFlushMessage()
- val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
-
- var lastTick: MilliTimestamp = MilliTimestamp.now
- var oneShotSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty
- var permanentSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty
-
- def receive = {
- case Subscribe(category, selection, subscriber, permanent) ⇒ subscribe(category, selection, subscriber, permanent)
- case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber)
- case Terminated(subscriber) ⇒ unsubscribe(subscriber)
- case FlushMetrics ⇒ flush()
- }
-
- def subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanent: Boolean): Unit = {
- context.watch(subscriber)
- val newFilter: MetricSelectionFilter = GroupAndPatternFilter(category, new GlobPathFilter(selection))
-
- if (permanent) {
- permanentSubscriptions = permanentSubscriptions.updated(subscriber, newFilter combine {
- permanentSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty)
- })
- } else {
- oneShotSubscriptions = oneShotSubscriptions.updated(subscriber, newFilter combine {
- oneShotSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty)
- })
- }
- }
-
- def unsubscribe(subscriber: ActorRef): Unit = {
- if (permanentSubscriptions.contains(subscriber))
- permanentSubscriptions = permanentSubscriptions - subscriber
-
- if (oneShotSubscriptions.contains(subscriber))
- oneShotSubscriptions = oneShotSubscriptions - subscriber
- }
-
- def flush(): Unit = {
- val currentTick = MilliTimestamp.now
- val snapshots = collectAll()
-
- dispatchSelectedMetrics(lastTick, currentTick, permanentSubscriptions, snapshots)
- dispatchSelectedMetrics(lastTick, currentTick, oneShotSubscriptions, snapshots)
-
- lastTick = currentTick
- oneShotSubscriptions = Map.empty
- }
-
- def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] = {
- val allMetrics = Kamon(Metrics).storage
- val builder = Map.newBuilder[MetricGroupIdentity, MetricGroupSnapshot]
-
- allMetrics.foreach {
- case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext)))
- }
-
- builder.result()
- }
-
- def dispatchSelectedMetrics(lastTick: MilliTimestamp, currentTick: MilliTimestamp, subscriptions: Map[ActorRef, MetricSelectionFilter],
- snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = {
-
- for ((subscriber, filter) ← subscriptions) {
- val selection = snapshots.filter(group ⇒ filter.accept(group._1))
- val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection)
-
- subscriber ! tickMetrics
- }
- }
-
- def scheduleFlushMessage(): Cancellable = {
- val config = context.system.settings.config
- val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
- context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
- }
-}
-
-object Subscriptions {
- case object FlushMetrics
- case class Unsubscribe(subscriber: ActorRef)
- case class Subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanently: Boolean = false)
- case class TickMetricSnapshot(from: MilliTimestamp, to: MilliTimestamp, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot])
-
- trait MetricSelectionFilter {
- def accept(identity: MetricGroupIdentity): Boolean
- }
-
- object MetricSelectionFilter {
- val empty = new MetricSelectionFilter {
- def accept(identity: MetricGroupIdentity): Boolean = false
- }
-
- implicit class CombinableMetricSelectionFilter(msf: MetricSelectionFilter) {
- def combine(that: MetricSelectionFilter): MetricSelectionFilter = new MetricSelectionFilter {
- def accept(identity: MetricGroupIdentity): Boolean = msf.accept(identity) || that.accept(identity)
- }
- }
- }
-
- case class GroupAndPatternFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) extends MetricSelectionFilter {
- def accept(identity: MetricGroupIdentity): Boolean = {
- category.equals(identity.category) && globFilter.accept(identity.name)
- }
- }
-}
-
-class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor {
- val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher)
- val collectionContext = Kamon(Metrics)(context.system).buildDefaultCollectionContext
-
- def receive = empty
-
- def empty: Actor.Receive = {
- case tick: TickMetricSnapshot ⇒ context become (buffering(tick))
- case FlushBuffer ⇒ // Nothing to flush.
- }
-
- def buffering(buffered: TickMetricSnapshot): Actor.Receive = {
- case TickMetricSnapshot(_, to, tickMetrics) ⇒
- val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup)
- val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics)
-
- context become (buffering(combinedSnapshot))
-
- case FlushBuffer ⇒
- receiver ! buffered
- context become (empty)
-
- }
-
- override def postStop(): Unit = {
- flushSchedule.cancel()
- super.postStop()
- }
-
- def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = left.merge(right.asInstanceOf[left.GroupSnapshotType], collectionContext).asInstanceOf[MetricGroupSnapshot] // ??? //Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r, collectionContext)))
-}
-
-object TickMetricSnapshotBuffer {
- case object FlushBuffer
-
- def props(flushInterval: FiniteDuration, receiver: ActorRef): Props =
- Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver))
-}
diff --git a/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala
new file mode 100644
index 00000000..f616be35
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala
@@ -0,0 +1,115 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor._
+import kamon.metric.SubscriptionsDispatcher._
+import kamon.util.{ MilliTimestamp, GlobPathFilter }
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * Manages subscriptions to metrics and dispatch snapshots on every tick to all subscribers.
+ */
+private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, collector: () ⇒ Map[Entity, EntitySnapshot]) extends Actor {
+ var lastTick = MilliTimestamp.now
+ var oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
+ var permanentSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
+ val tickSchedule = context.system.scheduler.schedule(interval, interval, self, Tick)(context.dispatcher)
+
+ def receive = {
+ case Tick ⇒ processTick()
+ case Subscribe(filter, subscriber, permanently) ⇒ subscribe(filter, subscriber, permanently)
+ case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber)
+ case Terminated(subscriber) ⇒ unsubscribe(subscriber)
+ }
+
+ def processTick(): Unit =
+ dispatch(collector())
+
+ def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = {
+ def addSubscription(storage: Map[ActorRef, SubscriptionFilter]): Map[ActorRef, SubscriptionFilter] =
+ storage.updated(subscriber, storage.getOrElse(subscriber, SubscriptionFilter.Empty).combine(filter))
+
+ context.watch(subscriber)
+
+ if (permanent)
+ permanentSubscriptions = addSubscription(permanentSubscriptions)
+ else
+ oneShotSubscriptions = addSubscription(oneShotSubscriptions)
+ }
+
+ def unsubscribe(subscriber: ActorRef): Unit = {
+ permanentSubscriptions = permanentSubscriptions - subscriber
+ oneShotSubscriptions = oneShotSubscriptions - subscriber
+ }
+
+ def dispatch(snapshots: Map[Entity, EntitySnapshot]): Unit = {
+ val currentTick = MilliTimestamp.now
+
+ dispatchSelections(lastTick, currentTick, permanentSubscriptions, snapshots)
+ dispatchSelections(lastTick, currentTick, oneShotSubscriptions, snapshots)
+
+ lastTick = currentTick
+ oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
+ }
+
+ def dispatchSelections(lastTick: MilliTimestamp, currentTick: MilliTimestamp, subscriptions: Map[ActorRef, SubscriptionFilter],
+ snapshots: Map[Entity, EntitySnapshot]): Unit = {
+
+ for ((subscriber, filter) ← subscriptions) {
+ val selection = snapshots.filter(group ⇒ filter.accept(group._1))
+ val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection)
+
+ subscriber ! tickMetrics
+ }
+ }
+}
+
+object SubscriptionsDispatcher {
+ def props(interval: FiniteDuration, collector: () ⇒ Map[Entity, EntitySnapshot]): Props =
+ Props(new SubscriptionsDispatcher(interval, collector))
+
+ case object Tick
+ case class Unsubscribe(subscriber: ActorRef)
+ case class Subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean = false)
+ case class TickMetricSnapshot(from: MilliTimestamp, to: MilliTimestamp, metrics: Map[Entity, EntitySnapshot])
+
+}
+
+trait SubscriptionFilter { self ⇒
+
+ def accept(entity: Entity): Boolean
+
+ final def combine(that: SubscriptionFilter): SubscriptionFilter = new SubscriptionFilter {
+ override def accept(entity: Entity): Boolean = self.accept(entity) || that.accept(entity)
+ }
+}
+
+object SubscriptionFilter {
+ val Empty = new SubscriptionFilter {
+ def accept(entity: Entity): Boolean = false
+ }
+
+ def apply(category: String, name: String): SubscriptionFilter = new SubscriptionFilter {
+ val categoryPattern = new GlobPathFilter(category)
+ val namePattern = new GlobPathFilter(name)
+
+ def accept(entity: Entity): Boolean = {
+ categoryPattern.accept(entity.category) && namePattern.accept(entity.name)
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala
new file mode 100644
index 00000000..b9127118
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala
@@ -0,0 +1,49 @@
+package kamon.metric
+
+import akka.actor.{ Props, Actor, ActorRef }
+import kamon.Kamon
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer
+import kamon.metric.instrument.CollectionContext
+import kamon.util.MapMerge
+
+import scala.concurrent.duration.FiniteDuration
+
+class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor {
+ import MapMerge.Syntax
+
+ val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher)
+ val collectionContext: CollectionContext = Kamon(Metrics)(context.system).buildDefaultCollectionContext
+
+ def receive = empty
+
+ def empty: Actor.Receive = {
+ case tick: TickMetricSnapshot ⇒ context become (buffering(tick))
+ case FlushBuffer ⇒ // Nothing to flush.
+ }
+
+ def buffering(buffered: TickMetricSnapshot): Actor.Receive = {
+ case TickMetricSnapshot(_, to, tickMetrics) ⇒
+ val combinedMetrics = buffered.metrics.merge(tickMetrics, (l, r) ⇒ l.merge(r, collectionContext))
+ val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics)
+
+ context become (buffering(combinedSnapshot))
+
+ case FlushBuffer ⇒
+ receiver ! buffered
+ context become (empty)
+
+ }
+
+ override def postStop(): Unit = {
+ flushSchedule.cancel()
+ super.postStop()
+ }
+}
+
+object TickMetricSnapshotBuffer {
+ case object FlushBuffer
+
+ def props(flushInterval: FiniteDuration, receiver: ActorRef): Props =
+ Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver))
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
index eaad6e0d..3da9c1d4 100644
--- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
@@ -16,67 +16,29 @@
package kamon.metric
-import akka.actor.ActorSystem
-import kamon.metric.instrument.{ Histogram }
+import kamon.metric.instrument.{ Time, InstrumentFactory, Histogram }
-import scala.collection.concurrent.TrieMap
-import com.typesafe.config.Config
+class TraceMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
+ import TraceMetrics.segmentKey
-case class TraceMetrics(name: String) extends MetricGroupIdentity {
- val category = TraceMetrics
-}
-
-object TraceMetrics extends MetricGroupCategory {
- import Metrics.AtomicGetOrElseUpdateForTriemap
-
- val name = "trace"
-
- case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" }
-
- case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram)
- extends MetricGroupRecorder {
-
- val segments = TrieMap[MetricIdentity, Histogram]()
-
- def segmentRecorder(segmentIdentity: MetricIdentity): Histogram =
- segments.atomicGetOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply())
-
- def collect(context: CollectionContext): TraceMetricsSnapshot =
- TraceMetricsSnapshot(
- elapsedTime.collect(context),
- segments.map { case (identity, recorder) ⇒ (identity, recorder.collect(context)) }.toMap)
+ /**
+ * Records blah blah
+ */
+ val ElapsedTime = histogram("elapsed-time", unitOfMeasurement = Time.Nanoseconds)
- def cleanup: Unit = {}
- }
-
- case class TraceMetricsSnapshot(elapsedTime: Histogram.Snapshot, segments: Map[MetricIdentity, Histogram.Snapshot])
- extends MetricGroupSnapshot {
-
- type GroupSnapshotType = TraceMetricsSnapshot
-
- def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot =
- TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), combineMaps(segments, that.segments)((l, r) ⇒ l.merge(r, context)))
-
- def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime)
- }
-
- val Factory = TraceMetricGroupFactory
+ /**
+ * Records Blah Blah.
+ *
+ */
+ def segment(name: String, category: String, library: String): Histogram =
+ histogram(segmentKey(name, category, library))
}
-case object TraceMetricGroupFactory extends MetricGroupFactory {
-
- import TraceMetrics._
-
- type GroupRecorder = TraceMetricRecorder
-
- def create(config: Config, system: ActorSystem): TraceMetricRecorder = {
- val settings = config.getConfig("precision.trace")
- val elapsedTimeConfig = settings.getConfig("elapsed-time")
- val segmentConfig = settings.getConfig("segment")
+object TraceMetrics extends EntityRecorderFactory[TraceMetrics] {
+ def category: String = "trace"
+ def createRecorder(instrumentFactory: InstrumentFactory): TraceMetrics = new TraceMetrics(instrumentFactory)
- new TraceMetricRecorder(
- Histogram.fromConfig(elapsedTimeConfig, Scale.Nano),
- () ⇒ Histogram.fromConfig(segmentConfig, Scale.Nano))
- }
+ def segmentKey(name: String, category: String, library: String): HistogramKey =
+ HistogramKey(name, Time.Nanoseconds, Map("category" -> category, "library" -> library))
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
index b7ac1ac5..5e1a7629 100644
--- a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
@@ -1,189 +1,193 @@
package kamon.metric
import akka.actor
-import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
+import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
import kamon.Kamon
-import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram }
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
+import kamon.metric.instrument._
import scala.concurrent.duration.FiniteDuration
-class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- import Metrics.AtomicGetOrElseUpdateForTriemap
- import UserMetrics._
-
- lazy val metricsExtension = Kamon(Metrics)(system)
- val precisionConfig = system.settings.config.getConfig("kamon.metrics.precision")
-
- val defaultHistogramPrecisionConfig = precisionConfig.getConfig("default-histogram-precision")
- val defaultMinMaxCounterPrecisionConfig = precisionConfig.getConfig("default-min-max-counter-precision")
- val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision")
+object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider {
+ override def get(system: ActorSystem): UserMetricsExtension = super.get(system)
+ def lookup(): ExtensionId[_ <: actor.Extension] = UserMetrics
+ def createExtension(system: ExtendedActorSystem): UserMetricsExtension = {
+ val metricsExtension = Metrics.get(system)
+ val instrumentFactory = metricsExtension.instrumentFactory(entity.category)
+ val userMetricsExtension = new UserMetricsExtensionImpl(instrumentFactory)
- def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), {
- UserHistogramRecorder(Histogram(highestTrackableValue, precision, Scale.Unit))
- }).asInstanceOf[UserHistogramRecorder].histogram
+ metricsExtension.register(entity, userMetricsExtension).recorder
}
- def registerHistogram(name: String): Histogram = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), {
- UserHistogramRecorder(Histogram.fromConfig(defaultHistogramPrecisionConfig))
- }).asInstanceOf[UserHistogramRecorder].histogram
- }
+ val entity = Entity("user-metric", "user-metric")
+}
- def registerCounter(name: String): Counter = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserCounter(name), {
- UserCounterRecorder(Counter())
- }).asInstanceOf[UserCounterRecorder].counter
- }
+trait UserMetricsExtension extends Kamon.Extension {
+ def histogram(name: String): Histogram
+ def histogram(name: String, dynamicRange: DynamicRange): Histogram
+ def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram
+ def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram
+ def histogram(key: HistogramKey): Histogram
+ def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram
+ def removeHistogram(name: String): Unit
+ def removeHistogram(key: HistogramKey): Unit
+
+ def minMaxCounter(name: String): MinMaxCounter
+ def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter
+ def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter
+ def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter
+ def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter
+ def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter
+ def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter
+ def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter
+ def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter
+ def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter
+ def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter
+ def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter
+ def removeMinMaxCounter(name: String): Unit
+ def removeMinMaxCounter(key: MinMaxCounterKey): Unit
+
+ def gauge(name: String, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge
+ def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge
+ def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge
+ def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge
+ def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge
+ def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge
+ def removeGauge(name: String): Unit
+ def removeGauge(key: GaugeKey): Unit
+
+ def counter(name: String): Counter
+ def counter(key: CounterKey): Counter
+ def removeCounter(name: String): Unit
+ def removeCounter(key: CounterKey): Unit
- def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
- refreshInterval: FiniteDuration): MinMaxCounter = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), {
- UserMinMaxCounterRecorder(MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system))
- }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter
- }
+}
- def registerMinMaxCounter(name: String): MinMaxCounter = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), {
- UserMinMaxCounterRecorder(MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system))
- }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter
- }
+class UserMetricsExtensionImpl(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with UserMetricsExtension {
+ override def histogram(name: String): Histogram =
+ super.histogram(name)
- def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), {
- UserGaugeRecorder(Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector))
- }).asInstanceOf[UserGaugeRecorder].gauge
- }
+ override def histogram(name: String, dynamicRange: DynamicRange): Histogram =
+ super.histogram(name, dynamicRange)
- def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
- refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = {
- metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), {
- UserGaugeRecorder(Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector))
- }).asInstanceOf[UserGaugeRecorder].gauge
- }
+ override def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram =
+ super.histogram(name, unitOfMeasurement)
- def removeHistogram(name: String): Unit =
- metricsExtension.unregister(UserHistogram(name))
+ override def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram =
+ super.histogram(name, dynamicRange, unitOfMeasurement)
- def removeCounter(name: String): Unit =
- metricsExtension.unregister(UserCounter(name))
+ override def histogram(key: HistogramKey): Histogram =
+ super.histogram(key)
- def removeMinMaxCounter(name: String): Unit =
- metricsExtension.unregister(UserMinMaxCounter(name))
+ override def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram =
+ super.histogram(key, dynamicRange)
- def removeGauge(name: String): Unit =
- metricsExtension.unregister(UserGauge(name))
-}
+ override def removeHistogram(name: String): Unit =
+ super.removeHistogram(name)
-object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider {
- def lookup(): ExtensionId[_ <: actor.Extension] = Metrics
+ override def removeHistogram(key: HistogramKey): Unit =
+ super.removeHistogram(key)
- def createExtension(system: ExtendedActorSystem): UserMetricsExtension = new UserMetricsExtension(system)
+ override def minMaxCounter(name: String): MinMaxCounter =
+ super.minMaxCounter(name)
- sealed trait UserMetricGroup
- //
- // Histograms
- //
+ override def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter =
+ super.minMaxCounter(name, dynamicRange)
- case class UserHistogram(name: String) extends MetricGroupIdentity with UserMetricGroup {
- val category = UserHistograms
- }
+ override def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter =
+ super.minMaxCounter(name, refreshInterval)
- case class UserHistogramRecorder(histogram: Histogram) extends MetricGroupRecorder {
- def collect(context: CollectionContext): MetricGroupSnapshot =
- UserHistogramSnapshot(histogram.collect(context))
+ override def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ super.minMaxCounter(name, unitOfMeasurement)
- def cleanup: Unit = histogram.cleanup
- }
+ override def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter =
+ super.minMaxCounter(name, dynamicRange, refreshInterval)
- case class UserHistogramSnapshot(histogramSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot {
- type GroupSnapshotType = UserHistogramSnapshot
+ override def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ super.minMaxCounter(name, dynamicRange, unitOfMeasurement)
- def merge(that: UserHistogramSnapshot, context: CollectionContext): UserHistogramSnapshot =
- UserHistogramSnapshot(that.histogramSnapshot.merge(histogramSnapshot, context))
+ override def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ super.minMaxCounter(name, refreshInterval, unitOfMeasurement)
- def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, histogramSnapshot))
- }
+ override def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ super.minMaxCounter(name, dynamicRange, refreshInterval, unitOfMeasurement)
- //
- // Counters
- //
+ override def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter =
+ super.minMaxCounter(key)
- case class UserCounter(name: String) extends MetricGroupIdentity with UserMetricGroup {
- val category = UserCounters
- }
+ override def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter =
+ super.minMaxCounter(key, dynamicRange)
- case class UserCounterRecorder(counter: Counter) extends MetricGroupRecorder {
- def collect(context: CollectionContext): MetricGroupSnapshot =
- UserCounterSnapshot(counter.collect(context))
+ override def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter =
+ super.minMaxCounter(key, refreshInterval)
- def cleanup: Unit = counter.cleanup
- }
+ override def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter =
+ super.minMaxCounter(key, dynamicRange, refreshInterval)
- case class UserCounterSnapshot(counterSnapshot: Counter.Snapshot) extends MetricGroupSnapshot {
- type GroupSnapshotType = UserCounterSnapshot
+ override def removeMinMaxCounter(name: String): Unit =
+ super.removeMinMaxCounter(name)
- def merge(that: UserCounterSnapshot, context: CollectionContext): UserCounterSnapshot =
- UserCounterSnapshot(that.counterSnapshot.merge(counterSnapshot, context))
+ override def removeMinMaxCounter(key: MinMaxCounterKey): Unit =
+ super.removeMinMaxCounter(key)
- def metrics: Map[MetricIdentity, MetricSnapshot] = Map((Count, counterSnapshot))
- }
+ override def gauge(name: String, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, valueCollector)
- //
- // MinMaxCounters
- //
+ override def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, dynamicRange, valueCollector)
- case class UserMinMaxCounter(name: String) extends MetricGroupIdentity with UserMetricGroup {
- val category = UserMinMaxCounters
- }
+ override def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, refreshInterval, valueCollector)
- case class UserMinMaxCounterRecorder(minMaxCounter: MinMaxCounter) extends MetricGroupRecorder {
- def collect(context: CollectionContext): MetricGroupSnapshot =
- UserMinMaxCounterSnapshot(minMaxCounter.collect(context))
+ override def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, unitOfMeasurement, valueCollector)
- def cleanup: Unit = minMaxCounter.cleanup
- }
+ override def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, dynamicRange, refreshInterval, valueCollector)
- case class UserMinMaxCounterSnapshot(minMaxCounterSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot {
- type GroupSnapshotType = UserMinMaxCounterSnapshot
+ override def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, dynamicRange, unitOfMeasurement, valueCollector)
- def merge(that: UserMinMaxCounterSnapshot, context: CollectionContext): UserMinMaxCounterSnapshot =
- UserMinMaxCounterSnapshot(that.minMaxCounterSnapshot.merge(minMaxCounterSnapshot, context))
+ override def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, refreshInterval, unitOfMeasurement, valueCollector)
- def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, minMaxCounterSnapshot))
- }
-
- //
- // Gauges
- //
+ override def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(name, dynamicRange, refreshInterval, unitOfMeasurement, valueCollector)
- case class UserGauge(name: String) extends MetricGroupIdentity with UserMetricGroup {
- val category = UserGauges
- }
+ override def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(key, valueCollector)
- case class UserGaugeRecorder(gauge: Gauge) extends MetricGroupRecorder {
- def collect(context: CollectionContext): MetricGroupSnapshot =
- UserGaugeSnapshot(gauge.collect(context))
+ override def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(key, dynamicRange, valueCollector)
- def cleanup: Unit = gauge.cleanup
- }
+ override def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(key, refreshInterval, valueCollector)
- case class UserGaugeSnapshot(gaugeSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot {
- type GroupSnapshotType = UserGaugeSnapshot
+ override def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ super.gauge(key, dynamicRange, refreshInterval, valueCollector)
- def merge(that: UserGaugeSnapshot, context: CollectionContext): UserGaugeSnapshot =
- UserGaugeSnapshot(that.gaugeSnapshot.merge(gaugeSnapshot, context))
+ override def removeGauge(name: String): Unit =
+ super.removeGauge(name)
- def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, gaugeSnapshot))
- }
+ override def removeGauge(key: GaugeKey): Unit =
+ super.removeGauge(key)
- case object UserHistograms extends MetricGroupCategory { val name: String = "histogram" }
- case object UserCounters extends MetricGroupCategory { val name: String = "counter" }
- case object UserMinMaxCounters extends MetricGroupCategory { val name: String = "min-max-counter" }
- case object UserGauges extends MetricGroupCategory { val name: String = "gauge" }
+ override def counter(name: String): Counter =
+ super.counter(name)
- case object RecordedValues extends MetricIdentity { val name: String = "values" }
- case object Count extends MetricIdentity { val name: String = "count" }
+ override def counter(key: CounterKey): Counter =
+ super.counter(key)
-}
+ override def removeCounter(name: String): Unit =
+ super.removeCounter(name)
+ override def removeCounter(key: CounterKey): Unit =
+ super.removeCounter(key)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala b/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala
index e79090a8..e79090a8 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
index 0f29ba6f..c1b69cbe 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
@@ -17,9 +17,8 @@
package kamon.metric.instrument
import kamon.jsr166.LongAdder
-import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder }
-trait Counter extends MetricRecorder {
+trait Counter extends Instrument {
type SnapshotType = Counter.Snapshot
def increment(): Unit
@@ -29,12 +28,11 @@ trait Counter extends MetricRecorder {
object Counter {
def apply(): Counter = new LongAdderCounter
+ def create(): Counter = apply()
- trait Snapshot extends MetricSnapshot {
- type SnapshotType = Counter.Snapshot
-
+ trait Snapshot extends InstrumentSnapshot {
def count: Long
- def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot
+ def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot
}
}
@@ -55,5 +53,8 @@ class LongAdderCounter extends Counter {
}
case class CounterSnapshot(count: Long) extends Counter.Snapshot {
- def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count)
+ def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot = that match {
+ case CounterSnapshot(thatCount) ⇒ CounterSnapshot(count + thatCount)
+ case other ⇒ sys.error(s"Cannot merge a CounterSnapshot with the incompatible [${other.getClass.getName}] type.")
+ }
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
index efd7d78f..2341504c 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
@@ -1,70 +1,89 @@
package kamon.metric.instrument
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.atomic.{ AtomicLong, AtomicLongFieldUpdater, AtomicReference }
-import akka.actor.{ Cancellable, ActorSystem }
-import com.typesafe.config.Config
-import kamon.metric.{ CollectionContext, Scale, MetricRecorder }
+import akka.actor.Cancellable
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
import scala.concurrent.duration.FiniteDuration
-trait Gauge extends MetricRecorder {
+trait Gauge extends Instrument {
type SnapshotType = Histogram.Snapshot
- def record(value: Long)
- def record(value: Long, count: Long)
+ def record(value: Long): Unit
+ def record(value: Long, count: Long): Unit
+ def refreshValue(): Unit
}
object Gauge {
- trait CurrentValueCollector {
- def currentValue: Long
- }
-
- def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration,
- system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = {
-
- val underlyingHistogram = Histogram(highestTrackableValue, precision, scale)
- val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector)
-
- val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) {
+ def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = {
+ val underlyingHistogram = Histogram(dynamicRange)
+ val gauge = new HistogramBackedGauge(underlyingHistogram, valueCollector)
+ val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ {
gauge.refreshValue()
- }(system.dispatcher) // TODO: Move this to Kamon dispatchers
+ })
- gauge.refreshValuesSchedule.set(refreshValuesSchedule)
+ gauge.automaticValueCollectorSchedule.set(refreshValuesSchedule)
gauge
}
- def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge =
- fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction))
+ def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge =
+ apply(dynamicRange, refreshInterval, scheduler, valueCollector)
- def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = {
- val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision")
- fromConfig(config, system)(currentValueCollector)
+ trait CurrentValueCollector {
+ def currentValue: Long
}
- def fromConfig(config: Config, system: ActorSystem, scale: Scale)(currentValueCollector: CurrentValueCollector): Gauge = {
- import scala.concurrent.duration._
+ implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector {
+ def currentValue: Long = f.apply()
+ }
+}
- val highest = config.getLong("highest-trackable-value")
- val significantDigits = config.getInt("significant-value-digits")
- val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS)
+/**
+ * Helper for cases in which a gauge shouldn't store the current value of a observed value but the difference between
+ * the current observed value and the previously observed value. Should only be used if the observed value is always
+ * increasing or staying steady, but is never able to decrease.
+ *
+ * Note: The first time a value is collected, this wrapper will always return zero, afterwards, the difference between
+ * the current value and the last value will be returned.
+ */
+class DifferentialValueCollector(wrappedValueCollector: CurrentValueCollector) extends CurrentValueCollector {
+ @volatile private var _readAtLeastOnce = false
+ private val _lastObservedValue = new AtomicLong(0)
+
+ def currentValue: Long = {
+ if (_readAtLeastOnce) {
+ val wrappedCurrent = wrappedValueCollector.currentValue
+ val d = wrappedCurrent - _lastObservedValue.getAndSet(wrappedCurrent)
+
+ if (d < 0)
+ println("HUBO MENOR QUE CERO")
+
+ d
+
+ } else {
+ _lastObservedValue.set(wrappedValueCollector.currentValue)
+ _readAtLeastOnce = true
+ 0
+ }
- Gauge(Histogram.Precision(significantDigits), highest, scale, refreshInterval.millis, system)(currentValueCollector)
}
+}
- def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = {
- fromConfig(config, system, Scale.Unit)(currentValueCollector)
- }
+object DifferentialValueCollector {
+ def apply(wrappedValueCollector: CurrentValueCollector): CurrentValueCollector =
+ new DifferentialValueCollector(wrappedValueCollector)
- implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector {
- def currentValue: Long = f.apply()
- }
+ def apply(wrappedValueCollector: ⇒ Long): CurrentValueCollector =
+ new DifferentialValueCollector(new CurrentValueCollector {
+ def currentValue: Long = wrappedValueCollector
+ })
}
class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge {
- val refreshValuesSchedule = new AtomicReference[Cancellable]()
+ private[kamon] val automaticValueCollectorSchedule = new AtomicReference[Cancellable]()
def record(value: Long): Unit = underlyingHistogram.record(value)
@@ -73,10 +92,15 @@ class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector
def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context)
def cleanup: Unit = {
- if (refreshValuesSchedule.get() != null)
- refreshValuesSchedule.get().cancel()
+ if (automaticValueCollectorSchedule.get() != null)
+ automaticValueCollectorSchedule.get().cancel()
}
- def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue)
+ def refreshValue(): Unit = {
+ val a = currentValueCollector.currentValue
+ if (a < 0)
+ println("RECORDING FROM GAUGE => " + a + " - " + currentValueCollector.getClass)
+ underlyingHistogram.record(a)
+ }
}
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
index bed75fc8..5c4c7f71 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
@@ -17,12 +17,11 @@
package kamon.metric.instrument
import java.nio.LongBuffer
-import com.typesafe.config.Config
import org.HdrHistogram.AtomicHistogramFieldsAccessor
+import kamon.metric.instrument.Histogram.{ Snapshot, DynamicRange }
import org.HdrHistogram.AtomicHistogram
-import kamon.metric._
-trait Histogram extends MetricRecorder {
+trait Histogram extends Instrument {
type SnapshotType = Histogram.Snapshot
def record(value: Long)
@@ -31,30 +30,40 @@ trait Histogram extends MetricRecorder {
object Histogram {
- def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram =
- new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale)
-
- def fromConfig(config: Config): Histogram = {
- fromConfig(config, Scale.Unit)
- }
-
- def fromConfig(config: Config, scale: Scale): Histogram = {
- val highest = config.getLong("highest-trackable-value")
- val significantDigits = config.getInt("significant-value-digits")
-
- new HdrHistogram(1L, highest, significantDigits, scale)
- }
-
- object HighestTrackableValue {
- val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L
- }
-
- case class Precision(significantDigits: Int)
- object Precision {
- val Low = Precision(1)
- val Normal = Precision(2)
- val Fine = Precision(3)
- }
+ /**
+ * Scala API:
+ *
+ * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given
+ * [[kamon.metric.instrument.Histogram.DynamicRange]].
+ */
+ def apply(dynamicRange: DynamicRange): Histogram = new HdrHistogram(dynamicRange)
+
+ /**
+ * Java API:
+ *
+ * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given
+ * [[kamon.metric.instrument.Histogram.DynamicRange]].
+ */
+ def create(dynamicRange: DynamicRange): Histogram = apply(dynamicRange)
+
+ /**
+ * DynamicRange is a configuration object used to supply range and precision configuration to a
+ * [[kamon.metric.instrument.HdrHistogram]]. See the [[http://hdrhistogram.github.io/HdrHistogram/ HdrHistogram website]]
+ * for more details on how it works and the effects of these configuration values.
+ *
+ * @param lowestDiscernibleValue
+ * The lowest value that can be discerned (distinguished from 0) by the histogram.Must be a positive integer that
+ * is >= 1. May be internally rounded down to nearest power of 2.
+ *
+ * @param highestTrackableValue
+ * The highest value to be tracked by the histogram. Must be a positive integer that is >= (2 * lowestDiscernibleValue).
+ * Must not be larger than (Long.MAX_VALUE/2).
+ *
+ * @param precision
+ * The number of significant decimal digits to which the histogram will maintain value resolution and separation.
+ * Must be a non-negative integer between 1 and 3.
+ */
+ case class DynamicRange(lowestDiscernibleValue: Long, highestTrackableValue: Long, precision: Int)
trait Record {
def level: Long
@@ -67,29 +76,28 @@ object Histogram {
var rawCompactRecord: Long = 0L
}
- trait Snapshot extends MetricSnapshot {
- type SnapshotType = Histogram.Snapshot
+ trait Snapshot extends InstrumentSnapshot {
def isEmpty: Boolean = numberOfMeasurements == 0
- def scale: Scale
def numberOfMeasurements: Long
def min: Long
def max: Long
def sum: Long
def percentile(percentile: Double): Long
def recordsIterator: Iterator[Record]
+ def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot
def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot
}
object Snapshot {
- def empty(targetScale: Scale) = new Snapshot {
+ val empty = new Snapshot {
override def min: Long = 0L
override def max: Long = 0L
override def sum: Long = 0L
override def percentile(percentile: Double): Long = 0L
override def recordsIterator: Iterator[Record] = Iterator.empty
- override def merge(that: Snapshot, context: CollectionContext): Snapshot = that
- override def scale: Scale = targetScale
+ override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = that
+ override def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = that
override def numberOfMeasurements: Long = 0L
}
}
@@ -100,10 +108,8 @@ object Histogram {
* The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still
* leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken.
*/
-class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit)
- extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits)
- with Histogram with AtomicHistogramFieldsAccessor {
-
+class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRange.lowestDiscernibleValue,
+ dynamicRange.highestTrackableValue, dynamicRange.precision) with Histogram with AtomicHistogramFieldsAccessor {
import AtomicHistogramFieldsAccessor.totalCountUpdater
def record(value: Long): Unit = recordValue(value)
@@ -119,7 +125,7 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign
val measurementsArray = Array.ofDim[Long](buffer.limit())
buffer.get(measurementsArray, 0, measurementsArray.length)
- new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude())
+ new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude())
}
def getCounts = countsArray().length()
@@ -160,7 +166,7 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign
}
-case class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int,
+case class CompactHdrSnapshot(val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int,
subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot {
def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0))
@@ -182,53 +188,61 @@ case class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long,
percentileLevel
}
- def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = {
- if (that.isEmpty) this else if (this.isEmpty) that else {
- import context.buffer
- buffer.clear()
+ def merge(that: Histogram.Snapshot, context: CollectionContext): Snapshot =
+ merge(that.asInstanceOf[InstrumentSnapshot], context)
- val selfIterator = recordsIterator
- val thatIterator = that.recordsIterator
- var thatCurrentRecord: Histogram.Record = null
- var mergedNumberOfMeasurements = 0L
+ def merge(that: InstrumentSnapshot, context: CollectionContext): Histogram.Snapshot = that match {
+ case thatSnapshot: CompactHdrSnapshot ⇒
+ if (thatSnapshot.isEmpty) this else if (this.isEmpty) thatSnapshot else {
+ import context.buffer
+ buffer.clear()
- def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null
- def addToBuffer(compactRecord: Long): Unit = {
- mergedNumberOfMeasurements += countFromCompactRecord(compactRecord)
- buffer.put(compactRecord)
- }
+ val selfIterator = recordsIterator
+ val thatIterator = thatSnapshot.recordsIterator
+ var thatCurrentRecord: Histogram.Record = null
+ var mergedNumberOfMeasurements = 0L
- while (selfIterator.hasNext) {
- val selfCurrentRecord = selfIterator.next()
+ def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null
+ def addToBuffer(compactRecord: Long): Unit = {
+ mergedNumberOfMeasurements += countFromCompactRecord(compactRecord)
+ buffer.put(compactRecord)
+ }
- // Advance that to no further than the level of selfCurrentRecord
- thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord
- while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) {
- addToBuffer(thatCurrentRecord.rawCompactRecord)
- thatCurrentRecord = nextOrNull(thatIterator)
+ while (selfIterator.hasNext) {
+ val selfCurrentRecord = selfIterator.next()
+
+ // Advance that to no further than the level of selfCurrentRecord
+ thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord
+ while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) {
+ addToBuffer(thatCurrentRecord.rawCompactRecord)
+ thatCurrentRecord = nextOrNull(thatIterator)
+ }
+
+ // Include the current record of self and optionally merge if has the same level as thatCurrentRecord
+ if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) {
+ addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord))
+ thatCurrentRecord = nextOrNull(thatIterator)
+ } else {
+ addToBuffer(selfCurrentRecord.rawCompactRecord)
+ }
}
- // Include the current record of self and optionally merge if has the same level as thatCurrentRecord
- if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) {
- addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord))
- thatCurrentRecord = nextOrNull(thatIterator)
- } else {
- addToBuffer(selfCurrentRecord.rawCompactRecord)
+ // Include everything that might have been left from that
+ if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord)
+ while (thatIterator.hasNext) {
+ addToBuffer(thatIterator.next().rawCompactRecord)
}
- }
- // Include everything that might have been left from that
- if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord)
- while (thatIterator.hasNext) {
- addToBuffer(thatIterator.next().rawCompactRecord)
+ buffer.flip()
+ val compactRecords = Array.ofDim[Long](buffer.limit())
+ buffer.get(compactRecords)
+
+ new CompactHdrSnapshot(mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude)
}
- buffer.flip()
- val compactRecords = Array.ofDim[Long](buffer.limit())
- buffer.get(compactRecords)
+ case other ⇒
+ sys.error(s"Cannot merge a CompactHdrSnapshot with the incompatible [${other.getClass.getName}] type.")
- new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude)
- }
}
@inline private def mergeCompactRecords(left: Long, right: Long): Long = {
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
new file mode 100644
index 00000000..8cacc767
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
@@ -0,0 +1,56 @@
+package kamon.metric.instrument
+
+import java.nio.LongBuffer
+
+import akka.actor.{ Scheduler, Cancellable }
+import akka.dispatch.MessageDispatcher
+import scala.concurrent.duration.FiniteDuration
+
+private[kamon] trait Instrument {
+ type SnapshotType <: InstrumentSnapshot
+
+ def collect(context: CollectionContext): SnapshotType
+ def cleanup: Unit
+}
+
+trait InstrumentSnapshot {
+ def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot
+}
+
+class InstrumentType private[kamon] (val id: Int) extends AnyVal
+object InstrumentTypes {
+ val Histogram = new InstrumentType(1)
+ val MinMaxCounter = new InstrumentType(2)
+ val Gauge = new InstrumentType(3)
+ val Counter = new InstrumentType(4)
+}
+
+trait CollectionContext {
+ def buffer: LongBuffer
+}
+
+object CollectionContext {
+ def apply(longBufferSize: Int): CollectionContext = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(longBufferSize)
+ }
+}
+
+trait RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable
+}
+
+object RefreshScheduler {
+ val NoopScheduler = new RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = new Cancellable {
+ override def isCancelled: Boolean = true
+ override def cancel(): Boolean = true
+ }
+ }
+
+ def apply(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = new RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable =
+ scheduler.schedule(interval, interval)(refresh.apply())(dispatcher)
+ }
+
+ def create(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = apply(scheduler, dispatcher)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala
new file mode 100644
index 00000000..9b0c85cb
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala
@@ -0,0 +1,35 @@
+package kamon.metric.instrument
+
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
+
+import scala.concurrent.duration.FiniteDuration
+
+case class InstrumentFactory(configurations: Map[String, InstrumentCustomSettings], defaults: DefaultInstrumentSettings, scheduler: RefreshScheduler) {
+
+ private def resolveSettings(instrumentName: String, codeSettings: Option[InstrumentSettings], default: InstrumentSettings): InstrumentSettings = {
+ configurations.get(instrumentName).flatMap { customSettings ⇒
+ codeSettings.map(cs ⇒ customSettings.combine(cs)) orElse (Some(customSettings.combine(default)))
+
+ } getOrElse (codeSettings.getOrElse(default))
+ }
+
+ def createHistogram(name: String, dynamicRange: Option[DynamicRange] = None): Histogram = {
+ val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, None)), defaults.histogram)
+ Histogram(settings.dynamicRange)
+ }
+
+ def createMinMaxCounter(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None): MinMaxCounter = {
+ val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.minMaxCounter)
+ MinMaxCounter(settings.dynamicRange, settings.refreshInterval.get, scheduler)
+ }
+
+ def createGauge(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None,
+ valueCollector: CurrentValueCollector): Gauge = {
+
+ val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.gauge)
+ Gauge(settings.dynamicRange, settings.refreshInterval.get, scheduler, valueCollector)
+ }
+
+ def createCounter(): Counter = Counter()
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala
new file mode 100644
index 00000000..1446a25d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala
@@ -0,0 +1,67 @@
+package kamon.metric.instrument
+
+import java.util.concurrent.TimeUnit
+
+import com.typesafe.config.Config
+import kamon.metric.instrument.Histogram.DynamicRange
+
+import scala.concurrent.duration.FiniteDuration
+
+case class InstrumentCustomSettings(lowestDiscernibleValue: Option[Long], highestTrackableValue: Option[Long],
+ precision: Option[Int], refreshInterval: Option[FiniteDuration]) {
+
+ def combine(that: InstrumentSettings): InstrumentSettings =
+ InstrumentSettings(
+ DynamicRange(
+ lowestDiscernibleValue.getOrElse(that.dynamicRange.lowestDiscernibleValue),
+ highestTrackableValue.getOrElse(that.dynamicRange.highestTrackableValue),
+ precision.getOrElse(that.dynamicRange.precision)),
+ refreshInterval.orElse(that.refreshInterval))
+}
+
+object InstrumentCustomSettings {
+ import scala.concurrent.duration._
+
+ def fromConfig(config: Config): InstrumentCustomSettings =
+ InstrumentCustomSettings(
+ if (config.hasPath("lowest-discernible-value")) Some(config.getLong("lowest-discernible-value")) else None,
+ if (config.hasPath("highest-trackable-value")) Some(config.getLong("highest-trackable-value")) else None,
+ if (config.hasPath("precision")) Some(InstrumentSettings.parsePrecision(config.getString("precision"))) else None,
+ if (config.hasPath("refresh-interval")) Some(config.getDuration("refresh-interval", TimeUnit.NANOSECONDS).nanos) else None)
+
+}
+
+case class InstrumentSettings(dynamicRange: DynamicRange, refreshInterval: Option[FiniteDuration])
+
+object InstrumentSettings {
+
+ def readDynamicRange(config: Config): DynamicRange =
+ DynamicRange(
+ config.getLong("lowest-discernible-value"),
+ config.getLong("highest-trackable-value"),
+ parsePrecision(config.getString("precision")))
+
+ def parsePrecision(stringValue: String): Int = stringValue match {
+ case "low" ⇒ 1
+ case "normal" ⇒ 2
+ case "fine" ⇒ 3
+ case other ⇒ sys.error(s"Invalid precision configuration [$other] found, valid options are: [low|normal|fine].")
+ }
+}
+
+case class DefaultInstrumentSettings(histogram: InstrumentSettings, minMaxCounter: InstrumentSettings, gauge: InstrumentSettings)
+
+object DefaultInstrumentSettings {
+
+ def fromConfig(config: Config): DefaultInstrumentSettings = {
+ import scala.concurrent.duration._
+
+ val histogramSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("histogram")), None)
+ val minMaxCounterSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("min-max-counter")),
+ Some(config.getDuration("min-max-counter.refresh-interval", TimeUnit.NANOSECONDS).nanos))
+ val gaugeSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("gauge")),
+ Some(config.getDuration("gauge.refresh-interval", TimeUnit.NANOSECONDS).nanos))
+
+ DefaultInstrumentSettings(histogramSettings, minMaxCounterSettings, gaugeSettings)
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
index 4882d2aa..0828c8a9 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
@@ -17,16 +17,14 @@ package kamon.metric.instrument
*/
import java.lang.Math.abs
-import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
-import akka.actor.{ ActorSystem, Cancellable }
-import com.typesafe.config.Config
+import akka.actor.Cancellable
import kamon.jsr166.LongMaxUpdater
-import kamon.metric.{ Scale, MetricRecorder, CollectionContext }
+import kamon.metric.instrument.Histogram.DynamicRange
import kamon.util.PaddedAtomicLong
import scala.concurrent.duration.FiniteDuration
-trait MinMaxCounter extends MetricRecorder {
+trait MinMaxCounter extends Instrument {
override type SnapshotType = Histogram.Snapshot
def increment(): Unit
@@ -38,29 +36,20 @@ trait MinMaxCounter extends MetricRecorder {
object MinMaxCounter {
- def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration,
- system: ActorSystem): MinMaxCounter = {
-
- val underlyingHistogram = Histogram(highestTrackableValue, precision, scale)
+ def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter = {
+ val underlyingHistogram = Histogram(dynamicRange)
val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram)
-
- val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) {
+ val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ {
minMaxCounter.refreshValues()
- }(system.dispatcher) // TODO: Move this to Kamon dispatchers
+ })
minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule)
minMaxCounter
}
- def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = {
- import scala.concurrent.duration._
+ def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter =
+ apply(dynamicRange, refreshInterval, scheduler)
- val highest = config.getLong("highest-trackable-value")
- val significantDigits = config.getInt("significant-value-digits")
- val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS)
-
- apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system)
- }
}
class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter {
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala
new file mode 100644
index 00000000..cf6b8b4c
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala
@@ -0,0 +1,55 @@
+package kamon.metric.instrument
+
+trait UnitOfMeasurement {
+ def name: String
+ def label: String
+ def factor: Double
+}
+
+object UnitOfMeasurement {
+ case object Unknown extends UnitOfMeasurement {
+ val name = "unknown"
+ val label = "unknown"
+ val factor = 1D
+ }
+
+ def isUnknown(uom: UnitOfMeasurement): Boolean =
+ uom == Unknown
+
+ def isTime(uom: UnitOfMeasurement): Boolean =
+ uom.isInstanceOf[Time]
+
+}
+
+case class Time(factor: Double, label: String) extends UnitOfMeasurement {
+ val name = "time"
+
+ /**
+ * Scale a value from this scale factor to a different scale factor.
+ *
+ * @param toUnit Time unit of the expected result.
+ * @param value Value to scale.
+ * @return Equivalent of value on the target time unit.
+ */
+ def scale(toUnit: Time)(value: Long): Double =
+ (value * factor) / toUnit.factor
+}
+
+object Time {
+ val Nanoseconds = Time(1E-9, "n")
+ val Microseconds = Time(1E-6, "µs")
+ val Milliseconds = Time(1E-3, "ms")
+ val Seconds = Time(1, "s")
+}
+
+case class Memory(factor: Double, label: String) extends UnitOfMeasurement {
+ val name = "bytes"
+}
+
+object Memory {
+ val Bytes = Memory(1, "b")
+ val KiloBytes = Memory(1024, "Kb")
+ val MegaBytes = Memory(1024E2, "Mb")
+ val GigaBytes = Memory(1024E3, "Gb")
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metric/package.scala b/kamon-core/src/main/scala/kamon/metric/package.scala
deleted file mode 100644
index 43166058..00000000
--- a/kamon-core/src/main/scala/kamon/metric/package.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon
-
-import scala.annotation.tailrec
-import com.typesafe.config.Config
-
-package object metric {
-
- @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = {
- if (right.isEmpty)
- left
- else {
- val (key, rightValue) = right.head
- val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue)
-
- combineMaps(left.updated(key, value), right.tail)(valueMerger)
- }
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala b/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala
deleted file mode 100644
index 490bc127..00000000
--- a/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-package kamon.standalone
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-import kamon.Kamon
-import kamon.metric.UserMetrics
-import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram }
-
-import scala.concurrent.duration.FiniteDuration
-
-trait KamonStandalone {
- private[kamon] def system: ActorSystem
-
- def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram =
- Kamon(UserMetrics)(system).registerHistogram(name, precision, highestTrackableValue)
-
- def registerHistogram(name: String): Histogram =
- Kamon(UserMetrics)(system).registerHistogram(name)
-
- def registerCounter(name: String): Counter =
- Kamon(UserMetrics)(system).registerCounter(name)
-
- def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
- refreshInterval: FiniteDuration): MinMaxCounter =
- Kamon(UserMetrics)(system).registerMinMaxCounter(name, precision, highestTrackableValue, refreshInterval)
-
- def registerMinMaxCounter(name: String): MinMaxCounter =
- Kamon(UserMetrics)(system).registerMinMaxCounter(name)
-
- def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge =
- Kamon(UserMetrics)(system).registerGauge(name)(currentValueCollector)
-
- def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
- refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge =
- Kamon(UserMetrics)(system).registerGauge(name, precision, highestTrackableValue, refreshInterval)(currentValueCollector)
-
- def removeHistogram(name: String): Unit =
- Kamon(UserMetrics)(system).removeHistogram(name)
-
- def removeCounter(name: String): Unit =
- Kamon(UserMetrics)(system).removeCounter(name)
-
- def removeMinMaxCounter(name: String): Unit =
- Kamon(UserMetrics)(system).removeMinMaxCounter(name)
-
- def removeGauge(name: String): Unit =
- Kamon(UserMetrics)(system).removeGauge(name)
-}
-
-object KamonStandalone {
-
- def buildFromConfig(config: Config): KamonStandalone = buildFromConfig(config, "kamon-standalone")
-
- def buildFromConfig(config: Config, actorSystemName: String): KamonStandalone = new KamonStandalone {
- val system: ActorSystem = ActorSystem(actorSystemName, config)
- }
-}
-
-object EmbeddedKamonStandalone extends KamonStandalone {
- private[kamon] lazy val system = ActorSystem("kamon-standalone")
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala
index c39a9984..3b2a3bf9 100644
--- a/kamon-core/src/main/scala/kamon/trace/Incubator.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala
@@ -19,8 +19,8 @@ package kamon.trace
import java.util.concurrent.TimeUnit
import akka.actor.{ ActorLogging, Props, Actor, ActorRef }
-import kamon.{ NanoInterval, RelativeNanoTimestamp }
import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace }
+import kamon.util.{ NanoInterval, RelativeNanoTimestamp }
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.duration._
diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
index 66c6633d..e62178dd 100644
--- a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
@@ -18,16 +18,16 @@ package kamon.trace
import java.util.concurrent.ConcurrentLinkedQueue
-import akka.actor.ActorSystem
+import akka.actor.{ ExtensionId, ActorSystem }
import akka.event.LoggingAdapter
-import kamon.{ RelativeNanoTimestamp, NanoInterval }
-import kamon.metric.TraceMetrics.TraceMetricRecorder
+import kamon.Kamon.Extension
import kamon.metric.{ MetricsExtension, TraceMetrics }
+import kamon.util.{ NanoInterval, RelativeNanoTimestamp }
import scala.annotation.tailrec
-private[trace] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, val origin: TraceContextOrigin,
- val startRelativeTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, val system: ActorSystem)
+private[kamon] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail,
+ val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, val actorSystem: ActorSystem)
extends TraceContext {
@volatile private var _name = traceName
@@ -48,35 +48,36 @@ private[trace] class MetricsOnlyContext(traceName: String, val token: String, iz
def isOpen: Boolean = _isOpen
def addMetadata(key: String, value: String): Unit = {}
+ def lookupExtension[T <: Extension](id: ExtensionId[T]): T = id(actorSystem)
+
def finish(): Unit = {
_isOpen = false
- val traceElapsedTime = NanoInterval.since(startRelativeTimestamp)
+ val traceElapsedTime = NanoInterval.since(startTimestamp)
_elapsedTime = traceElapsedTime
- val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
- metricRecorder.map { traceMetrics ⇒
- traceMetrics.elapsedTime.record(traceElapsedTime.nanos)
- drainFinishedSegments(traceMetrics)
+ metricsExtension.register(TraceMetrics, name).map { registration ⇒
+ registration.recorder.ElapsedTime.record(traceElapsedTime.nanos)
+ drainFinishedSegments(registration.recorder)
}
}
def startSegment(segmentName: String, category: String, library: String): Segment =
new MetricsOnlySegment(segmentName, category, library)
- @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
+ @tailrec private def drainFinishedSegments(recorder: TraceMetrics): Unit = {
val segment = _finishedSegments.poll()
if (segment != null) {
- metricRecorder.segmentRecorder(segment.identity).record(segment.duration.nanos)
- drainFinishedSegments(metricRecorder)
+ recorder.segment(segment.name, segment.category, segment.library).record(segment.duration.nanos)
+ drainFinishedSegments(recorder)
}
}
protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = {
- _finishedSegments.add(SegmentLatencyData(SegmentMetricIdentity(segmentName, category, library), duration))
+ _finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration))
if (isClosed) {
- metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒
- drainFinishedSegments(traceMetrics)
+ metricsExtension.register(TraceMetrics, name).map { registration ⇒
+ drainFinishedSegments(registration.recorder)
}
}
}
@@ -118,4 +119,6 @@ private[trace] class MetricsOnlyContext(traceName: String, val token: String, iz
def elapsedTime: NanoInterval = _elapsedTime
def startTimestamp: RelativeNanoTimestamp = _startTimestamp
}
-} \ No newline at end of file
+}
+
+case class SegmentLatencyData(name: String, category: String, library: String, duration: NanoInterval)
diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
index 2308d1d0..5abba221 100644
--- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
@@ -16,8 +16,7 @@
package kamon.trace
-import kamon.NanoInterval
-import kamon.util.Sequencer
+import kamon.util.{ NanoInterval, Sequencer }
import scala.concurrent.forkjoin.ThreadLocalRandom
trait Sampler {
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 60244eaa..ed8170a9 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -17,26 +17,60 @@
package kamon.trace
import java.io.ObjectStreamException
-import akka.actor.ActorSystem
+import akka.actor.{ ExtensionId, ActorSystem }
+import kamon.Kamon.Extension
import kamon._
import kamon.metric._
import kamon.trace.TraceContextAware.DefaultTraceContextAware
+import kamon.util.{ NanoInterval, RelativeNanoTimestamp }
trait TraceContext {
def name: String
def token: String
- def origin: TraceContextOrigin
def isEmpty: Boolean
def nonEmpty: Boolean = !isEmpty
def isOpen: Boolean
def isClosed: Boolean = !isOpen
- def system: ActorSystem
def finish(): Unit
def rename(newName: String): Unit
+
def startSegment(segmentName: String, category: String, library: String): Segment
def addMetadata(key: String, value: String)
- def startRelativeTimestamp: RelativeNanoTimestamp
+
+ def startTimestamp: RelativeNanoTimestamp
+
+ def lookupExtension[T <: Kamon.Extension](id: ExtensionId[T]): T
+}
+
+object TraceContext {
+ private[kamon] val _traceContextStorage = new ThreadLocal[TraceContext] {
+ override def initialValue(): TraceContext = EmptyTraceContext
+ }
+
+ def currentContext: TraceContext =
+ _traceContextStorage.get()
+
+ def setCurrentContext(context: TraceContext): Unit =
+ _traceContextStorage.set(context)
+
+ def clearCurrentContext: Unit =
+ _traceContextStorage.remove()
+
+ def withContext[T](context: TraceContext)(code: ⇒ T): T = {
+ val oldContext = _traceContextStorage.get()
+ _traceContextStorage.set(context)
+
+ try code finally _traceContextStorage.set(oldContext)
+ }
+
+ def map[T](f: TraceContext ⇒ T): Option[T] = {
+ val current = currentContext
+ if (current.nonEmpty)
+ Some(f(current))
+ else None
+ }
+
}
trait Segment {
@@ -56,16 +90,17 @@ trait Segment {
case object EmptyTraceContext extends TraceContext {
def name: String = "empty-trace"
def token: String = ""
- def origin: TraceContextOrigin = TraceContextOrigin.Local
def isEmpty: Boolean = true
def isOpen: Boolean = false
- def system: ActorSystem = sys.error("Can't obtain a ActorSystem from a EmptyTraceContext.")
def finish(): Unit = {}
def rename(name: String): Unit = {}
def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment
def addMetadata(key: String, value: String): Unit = {}
- def startRelativeTimestamp = new RelativeNanoTimestamp(0L)
+ def startTimestamp = new RelativeNanoTimestamp(0L)
+
+ override def lookupExtension[T <: Extension](id: ExtensionId[T]): T =
+ sys.error("Can't lookup extensions on a EmptyTraceContext.")
case object EmptySegment extends Segment {
val name: String = "empty-segment"
@@ -80,14 +115,17 @@ case object EmptyTraceContext extends TraceContext {
}
}
-case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity
-case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: NanoInterval)
-
object SegmentCategory {
val HttpClient = "http-client"
val Database = "database"
}
+class LOD private[trace] (val level: Int) extends AnyVal
+object LOD {
+ val MetricsOnly = new LOD(1)
+ val SimpleTrace = new LOD(2)
+}
+
sealed trait LevelOfDetail
object LevelOfDetail {
case object MetricsOnly extends LevelOfDetail
@@ -95,12 +133,6 @@ object LevelOfDetail {
case object FullTrace extends LevelOfDetail
}
-sealed trait TraceContextOrigin
-object TraceContextOrigin {
- case object Local extends TraceContextOrigin
- case object Remote extends TraceContextOrigin
-}
-
trait TraceContextAware extends Serializable {
def traceContext: TraceContext
}
@@ -109,7 +141,7 @@ object TraceContextAware {
def default: TraceContextAware = new DefaultTraceContextAware
class DefaultTraceContextAware extends TraceContextAware {
- @transient val traceContext = TraceRecorder.currentContext
+ @transient val traceContext = TraceContext.currentContext
//
// Beware of this hack, it might bite us in the future!
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
deleted file mode 100644
index 41f022d0..00000000
--- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.trace
-
-import java.util.concurrent.TimeUnit
-
-import akka.actor._
-import akka.actor
-import akka.event.Logging
-import kamon._
-import kamon.metric.Metrics
-import kamon.util.GlobPathFilter
-
-class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- val config = system.settings.config.getConfig("kamon.trace")
- val dispatcher = system.dispatchers.lookup(config.getString("dispatcher"))
-
- val detailLevel: LevelOfDetail = config.getString("level") match {
- case "metrics-only" ⇒ LevelOfDetail.MetricsOnly
- case "simple-trace" ⇒ LevelOfDetail.SimpleTrace
- case other ⇒ sys.error(s"Unknown tracing level $other present in the configuration file.")
- }
-
- val sampler: Sampler =
- if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling
- else config.getString("sampling") match {
- case "all" ⇒ SampleAll
- case "random" ⇒ new RandomSampler(config.getInt("random-sampler.chance"))
- case "ordered" ⇒ new OrderedSampler(config.getInt("ordered-sampler.interval"))
- case "threshold" ⇒ new ThresholdSampler(config.getDuration("threshold-sampler.minimum-elapsed-time", TimeUnit.NANOSECONDS))
- }
-
- val log = Logging(system, "TraceExtension")
- val subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions")
- val incubator = system.actorOf(Incubator.props(subscriptions))
- val metricsExtension = Kamon(Metrics)(system)
-
- def newTraceContext(traceName: String, token: String, origin: TraceContextOrigin, system: ActorSystem): TraceContext =
- newTraceContext(traceName, token, true, origin, RelativeNanoTimestamp.now, system)
-
- def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin,
- startTimestamp: RelativeNanoTimestamp, system: ActorSystem): TraceContext = {
- def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, detailLevel, origin, startTimestamp, log, metricsExtension, system)
-
- if (detailLevel == LevelOfDetail.MetricsOnly || origin == TraceContextOrigin.Remote)
- newMetricsOnlyContext
- else {
- if (!sampler.shouldTrace)
- newMetricsOnlyContext
- else
- new TracingContext(traceName, token, true, detailLevel, origin, startTimestamp, log, metricsExtension, this, system)
- }
- }
-
- def subscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Subscribe(subscriber)
- def unsubscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Unsubscribe(subscriber)
-
- private[kamon] def dispatchTracingContext(trace: TracingContext): Unit =
- if (sampler.shouldReport(trace.elapsedTime))
- if (trace.shouldIncubate)
- incubator ! trace
- else
- subscriptions ! trace.generateTraceInfo
-
-}
-
-object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
- def lookup(): ExtensionId[_ <: actor.Extension] = Trace
- def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system)
-
- case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) {
- def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
- }
-}
-
-case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo])
-case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String]) \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
index 84e234f3..057f564e 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
@@ -42,12 +42,12 @@ object TraceLocal {
object HttpContextKey extends TraceLocal.TraceLocalKey { type ValueType = HttpContext }
- def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceRecorder.currentContext match {
+ def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceContext.currentContext match {
case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value)
case EmptyTraceContext ⇒ // Can't store in the empty context.
}
- def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceRecorder.currentContext match {
+ def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceContext.currentContext match {
case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key)
case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context.
}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
deleted file mode 100644
index 703896c3..00000000
--- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.trace
-
-import kamon.{ MilliTimestamp, RelativeNanoTimestamp, Kamon }
-
-import scala.language.experimental.macros
-import java.util.concurrent.atomic.AtomicLong
-import kamon.macros.InlineTraceContextMacro
-
-import scala.util.Try
-import java.net.InetAddress
-import akka.actor.ActorSystem
-
-object TraceRecorder {
- private val traceContextStorage = new ThreadLocal[TraceContext] {
- override def initialValue(): TraceContext = EmptyTraceContext
- }
-
- private val tokenCounter = new AtomicLong
- private val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
-
- def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet())
-
- private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext =
- Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), TraceContextOrigin.Local, system)
-
- def joinRemoteTraceContext(traceName: String, traceToken: String, startTimestamp: MilliTimestamp, isOpen: Boolean, system: ActorSystem): TraceContext = {
- val equivalentStartTimestamp = RelativeNanoTimestamp.relativeTo(startTimestamp)
- Kamon(Trace)(system).newTraceContext(traceName, traceToken, isOpen, TraceContextOrigin.Remote, equivalentStartTimestamp, system)
- }
-
- def setContext(context: TraceContext): Unit = traceContextStorage.set(context)
-
- def clearContext: Unit = traceContextStorage.set(EmptyTraceContext)
-
- def currentContext: TraceContext = traceContextStorage.get()
-
- def start(name: String, token: Option[String] = None)(implicit system: ActorSystem) = {
- val ctx = newTraceContext(name, token, system)
- traceContextStorage.set(ctx)
- }
-
- def rename(name: String): Unit = currentContext.rename(name)
-
- def withNewTraceContext[T](name: String, token: Option[String] = None)(thunk: ⇒ T)(implicit system: ActorSystem): T =
- withTraceContext(newTraceContext(name, token, system))(thunk)
-
- def withTraceContext[T](context: TraceContext)(thunk: ⇒ T): T = {
- val oldContext = currentContext
- setContext(context)
-
- try thunk finally setContext(oldContext)
- }
-
- def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match {
- case ctx: MetricsOnlyContext ⇒ Some(thunk(ctx, ctx.system))
- case EmptyTraceContext ⇒ None
- }
-
- def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext]
-
- def finish(): Unit = currentContext.finish()
-
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala
new file mode 100644
index 00000000..41dcd6bc
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala
@@ -0,0 +1,94 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import java.net.InetAddress
+import java.util.concurrent.atomic.AtomicLong
+
+import akka.actor._
+import akka.actor
+import kamon.Kamon
+import kamon.metric.{ Metrics, MetricsExtension }
+import kamon.util.{ NanoInterval, RelativeNanoTimestamp, NanoTimestamp, GlobPathFilter }
+
+import scala.util.Try
+
+object Tracer extends ExtensionId[TracerExtension] with ExtensionIdProvider {
+ override def get(system: ActorSystem): TracerExtension = super.get(system)
+ def lookup(): ExtensionId[_ <: actor.Extension] = Tracer
+ def createExtension(system: ExtendedActorSystem): TracerExtension = new TracerExtensionImpl(system)
+}
+
+trait TracerExtension extends Kamon.Extension {
+ def newContext(name: String): TraceContext
+ def newContext(name: String, token: String): TraceContext
+ def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext
+
+ def subscribe(subscriber: ActorRef): Unit
+ def unsubscribe(subscriber: ActorRef): Unit
+}
+
+class TracerExtensionImpl(system: ExtendedActorSystem) extends TracerExtension {
+ private val _settings = TraceSettings(system)
+ private val _metricsExtension = Metrics.get(system)
+
+ private val _hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
+ private val _tokenCounter = new AtomicLong
+ private val _subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions")
+ private val _incubator = system.actorOf(Incubator.props(_subscriptions))
+
+ private def newToken: String =
+ _hostnamePrefix + "-" + String.valueOf(_tokenCounter.incrementAndGet())
+
+ def newContext(name: String): TraceContext =
+ createTraceContext(name)
+
+ def newContext(name: String, token: String): TraceContext =
+ createTraceContext(name, token)
+
+ def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext =
+ createTraceContext(name, token, timestamp, isOpen, isLocal)
+
+ private def createTraceContext(traceName: String, token: String = newToken, startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now,
+ isOpen: Boolean = true, isLocal: Boolean = true): TraceContext = {
+
+ def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, _settings.levelOfDetail, startTimestamp, null, _metricsExtension, system)
+
+ if (_settings.levelOfDetail == LevelOfDetail.MetricsOnly || !isLocal)
+ newMetricsOnlyContext
+ else {
+ if (!_settings.sampler.shouldTrace)
+ newMetricsOnlyContext
+ else
+ new TracingContext(traceName, token, true, _settings.levelOfDetail, isLocal, startTimestamp, null, _metricsExtension, this, system, dispatchTracingContext)
+ }
+ }
+
+ def subscribe(subscriber: ActorRef): Unit = _subscriptions ! TraceSubscriptions.Subscribe(subscriber)
+ def unsubscribe(subscriber: ActorRef): Unit = _subscriptions ! TraceSubscriptions.Unsubscribe(subscriber)
+
+ private[kamon] def dispatchTracingContext(trace: TracingContext): Unit =
+ if (_settings.sampler.shouldReport(trace.elapsedTime))
+ if (trace.shouldIncubate)
+ _incubator ! trace
+ else
+ _subscriptions ! trace.generateTraceInfo
+
+}
+
+case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo])
+case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String]) \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala
new file mode 100644
index 00000000..e6c2d3ef
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala
@@ -0,0 +1,30 @@
+package kamon.trace
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor.ActorSystem
+
+case class TraceSettings(levelOfDetail: LevelOfDetail, sampler: Sampler)
+
+object TraceSettings {
+ def apply(system: ActorSystem): TraceSettings = {
+ val tracerConfig = system.settings.config.getConfig("kamon.trace")
+
+ val detailLevel: LevelOfDetail = tracerConfig.getString("level-of-detail") match {
+ case "metrics-only" ⇒ LevelOfDetail.MetricsOnly
+ case "simple-trace" ⇒ LevelOfDetail.SimpleTrace
+ case other ⇒ sys.error(s"Unknown tracer level of detail [$other] present in the configuration file.")
+ }
+
+ val sampler: Sampler =
+ if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling
+ else tracerConfig.getString("sampling") match {
+ case "all" ⇒ SampleAll
+ case "random" ⇒ new RandomSampler(tracerConfig.getInt("random-sampler.chance"))
+ case "ordered" ⇒ new OrderedSampler(tracerConfig.getInt("ordered-sampler.interval"))
+ case "threshold" ⇒ new ThresholdSampler(tracerConfig.getDuration("threshold-sampler.minimum-elapsed-time", TimeUnit.NANOSECONDS))
+ }
+
+ TraceSettings(detailLevel, sampler)
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
index 31ab282d..dd4c3c1a 100644
--- a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
@@ -21,14 +21,15 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.event.LoggingAdapter
-import kamon.{ NanoInterval, NanoTimestamp, RelativeNanoTimestamp }
+import kamon.util.{ NanoInterval, RelativeNanoTimestamp, NanoTimestamp }
import kamon.metric.MetricsExtension
import scala.collection.concurrent.TrieMap
-private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin,
- startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, traceExtension: TraceExtension, system: ActorSystem)
- extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, startTimeztamp, log, metricsExtension, system) {
+private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail,
+ isLocal: Boolean, startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension,
+ traceExtension: TracerExtensionImpl, system: ActorSystem, traceInfoSink: TracingContext ⇒ Unit)
+ extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, startTimeztamp, log, metricsExtension, system) {
private val _openSegments = new AtomicInteger(0)
private val _startTimestamp = NanoTimestamp.now
@@ -46,7 +47,7 @@ private[trace] class TracingContext(traceName: String, token: String, izOpen: Bo
override def finish(): Unit = {
super.finish()
- traceExtension.dispatchTracingContext(this)
+ traceInfoSink(this)
}
override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = {
@@ -66,7 +67,7 @@ private[trace] class TracingContext(traceName: String, token: String, izOpen: Bo
while (currentSegments.hasNext()) {
val segment = currentSegments.next()
if (segment.isClosed)
- segmentsInfo += segment.createSegmentInfo(_startTimestamp, startRelativeTimestamp)
+ segmentsInfo += segment.createSegmentInfo(_startTimestamp, startTimestamp)
else
log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name)
}
diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala
index f052f009..961c3099 100644
--- a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala
+++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala
@@ -17,11 +17,11 @@ package kamon.trace.logging
import ch.qos.logback.classic.pattern.ClassicConverter
import ch.qos.logback.classic.spi.ILoggingEvent
-import kamon.trace.TraceRecorder
+import kamon.trace.TraceContext
class LogbackTraceTokenConverter extends ClassicConverter {
def convert(event: ILoggingEvent): String = {
- val ctx = TraceRecorder.currentContext
+ val ctx = TraceContext.currentContext
if (ctx.isEmpty)
"undefined"
else
diff --git a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala
index 4f4efa4d..4970d97e 100644
--- a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala
+++ b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala
@@ -17,14 +17,14 @@
package kamon.trace.logging
import kamon.trace.TraceLocal.AvailableToMdc
-import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext, TraceRecorder }
+import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext }
import org.slf4j.MDC
trait MdcKeysSupport {
def withMdc[A](thunk: ⇒ A): A = {
- val keys = copyToMdc(TraceRecorder.currentContext)
+ val keys = copyToMdc(TraceContext.currentContext)
try thunk finally keys.foreach(key ⇒ MDC.remove(key))
}
diff --git a/kamon-core/src/main/scala/kamon/util/ConfigTools.scala b/kamon-core/src/main/scala/kamon/util/ConfigTools.scala
new file mode 100644
index 00000000..9810428e
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/ConfigTools.scala
@@ -0,0 +1,26 @@
+package kamon.util
+
+import java.util.concurrent.TimeUnit
+
+import com.typesafe.config.Config
+
+import scala.concurrent.duration.FiniteDuration
+
+object ConfigTools {
+ implicit class Syntax(val config: Config) extends AnyVal {
+ // We are using the deprecated .getNanoseconds option to keep Kamon source code compatible with
+ // versions of Akka using older typesafe-config versions.
+
+ def getFiniteDuration(path: String): FiniteDuration =
+ FiniteDuration(config.getNanoseconds(path), TimeUnit.NANOSECONDS)
+
+ def firstLevelKeys: Set[String] = {
+ import scala.collection.JavaConverters._
+
+ config.entrySet().asScala.map {
+ case entry ⇒ entry.getKey.takeWhile(_ != '.')
+ } toSet
+ }
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/util/FastDispatch.scala b/kamon-core/src/main/scala/kamon/util/FastDispatch.scala
new file mode 100644
index 00000000..8f23164a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/FastDispatch.scala
@@ -0,0 +1,22 @@
+package kamon.util
+
+import akka.actor.ActorRef
+
+import scala.concurrent.{ ExecutionContext, Future }
+
+/**
+ * Extension for Future[ActorRef]. Try to dispatch a message to a Future[ActorRef] in the same thread if it has already
+ * completed or do the regular scheduling otherwise. Specially useful when using the ModuleSupervisor extension to
+ * create actors.
+ */
+object FastDispatch {
+ implicit class Syntax(val target: Future[ActorRef]) extends AnyVal {
+
+ def fastDispatch(message: Any)(implicit ec: ExecutionContext): Unit =
+ if (target.isCompleted)
+ target.value.get.map(_ ! message)
+ else
+ target.map(_ ! message)
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/util/MapMerge.scala b/kamon-core/src/main/scala/kamon/util/MapMerge.scala
new file mode 100644
index 00000000..b7f18788
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/MapMerge.scala
@@ -0,0 +1,27 @@
+package kamon.util
+
+object MapMerge {
+
+ /**
+ * Merge to immutable maps with the same key and value types, using the provided valueMerge function.
+ */
+ implicit class Syntax[K, V](val map: Map[K, V]) extends AnyVal {
+ def merge(that: Map[K, V], valueMerge: (V, V) ⇒ V): Map[K, V] = {
+ val merged = Map.newBuilder[K, V]
+
+ map.foreach {
+ case (key, value) ⇒
+ val mergedValue = that.get(key).map(v ⇒ valueMerge(value, v)).getOrElse(value)
+ merged += key -> mergedValue
+ }
+
+ that.foreach {
+ case kv @ (key, _) if !map.contains(key) ⇒ merged += kv
+ case other ⇒ // ignore, already included.
+ }
+
+ merged.result();
+ }
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/TimeUnits.scala b/kamon-core/src/main/scala/kamon/util/Timestamp.scala
index f2933a11..4ff031a6 100644
--- a/kamon-core/src/main/scala/kamon/TimeUnits.scala
+++ b/kamon-core/src/main/scala/kamon/util/Timestamp.scala
@@ -1,7 +1,7 @@
-package kamon
+package kamon.util
/**
- * Epoch time stamp in seconds.
+ * Epoch time stamp.
*/
class Timestamp(val seconds: Long) extends AnyVal {
def <(that: Timestamp): Boolean = this.seconds < that.seconds
@@ -24,7 +24,12 @@ object Timestamp {
*/
class MilliTimestamp(val millis: Long) extends AnyVal {
override def toString: String = String.valueOf(millis) + ".millis"
+
def toTimestamp: Timestamp = new Timestamp(millis / 1000)
+ def toRelativeNanoTimestamp: RelativeNanoTimestamp = {
+ val diff = (System.currentTimeMillis() - millis) * 1000000
+ new RelativeNanoTimestamp(System.nanoTime() - diff)
+ }
}
object MilliTimestamp {
@@ -50,6 +55,9 @@ object NanoTimestamp {
*/
class RelativeNanoTimestamp(val nanos: Long) extends AnyVal {
override def toString: String = String.valueOf(nanos) + ".nanos"
+
+ def toMilliTimestamp: MilliTimestamp =
+ new MilliTimestamp(System.currentTimeMillis - ((System.nanoTime - nanos) / 1000000))
}
object RelativeNanoTimestamp {
diff --git a/kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala b/kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala
new file mode 100644
index 00000000..cd457cdc
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala
@@ -0,0 +1,18 @@
+package kamon.util
+
+import scala.collection.concurrent.TrieMap
+
+object TriemapAtomicGetOrElseUpdate {
+
+ /**
+ * Workaround to the non thread-safe [[scala.collection.concurrent.TrieMap#getOrElseUpdate]] method. More details on
+ * why this is necessary can be found at [[https://issues.scala-lang.org/browse/SI-7943]].
+ */
+ implicit class Syntax[K, V](val trieMap: TrieMap[K, V]) extends AnyVal {
+ def atomicGetOrElseUpdate(key: K, op: ⇒ V): V =
+ trieMap.get(key) match {
+ case Some(v) ⇒ v
+ case None ⇒ val d = op; trieMap.putIfAbsent(key, d).getOrElse(d)
+ }
+ }
+}
diff --git a/kamon-core/src/test/resources/logback.xml b/kamon-core/src/test/resources/logback.xml
index eb578346..dd623d61 100644
--- a/kamon-core/src/test/resources/logback.xml
+++ b/kamon-core/src/test/resources/logback.xml
@@ -1,17 +1,17 @@
<configuration scan="true">
+ <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
+ <resetJUL>true</resetJUL>
+ </contextListener>
- <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
- <resetJUL>true</resetJUL>
- </contextListener>
+ <conversionRule conversionWord="traceToken" converterClass="kamon.trace.logging.LogbackTraceTokenConverter"/>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%date{HH:mm:ss.SSS} %-5level [%X{uow}][%X{requestId}] [%thread] %logger{55} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="debug">
- <appender-ref ref="STDOUT" />
- </root>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%date{HH:mm:ss.SSS} %-5level [%traceToken][%thread] %logger{55} - %msg%n</pattern>
+ </encoder>
+ </appender>
+ <root level="error">
+ <appender-ref ref="STDOUT"/>
+ </root>
</configuration>
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala
deleted file mode 100644
index 31afd3ff..00000000
--- a/kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.instrumentation.scala
-
-import akka.actor.ActorSystem
-import akka.testkit.TestKit
-import kamon.trace.TraceRecorder
-import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures }
-import org.scalatest.{ Matchers, OptionValues, WordSpecLike }
-
-import scala.concurrent.Future
-
-class FutureInstrumentationSpec extends TestKit(ActorSystem("future-instrumentation-spec")) with WordSpecLike with Matchers
- with ScalaFutures with PatienceConfiguration with OptionValues {
-
- implicit val execContext = system.dispatcher
-
- "a Future created with FutureTracing" should {
- "capture the TraceContext available when created" which {
- "must be available when executing the future's body" in {
-
- val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") {
- val future = Future(TraceRecorder.currentContext)
-
- (future, TraceRecorder.currentContext)
- }
-
- whenReady(future)(ctxInFuture ⇒
- ctxInFuture should equal(testTraceContext))
- }
-
- "must be available when executing callbacks on the future" in {
-
- val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") {
- val future = Future("Hello Kamon!")
- // The TraceContext is expected to be available during all intermediate processing.
- .map(_.length)
- .flatMap(len ⇒ Future(len.toString))
- .map(s ⇒ TraceRecorder.currentContext)
-
- (future, TraceRecorder.currentContext)
- }
-
- whenReady(future)(ctxInFuture ⇒
- ctxInFuture should equal(testTraceContext))
- }
- }
- }
-}
-
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala
deleted file mode 100644
index 29bf96f8..00000000
--- a/kamon-core/src/test/scala/kamon/instrumentation/scalaz/FutureInstrumentationSpec.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.instrumentation.scalaz
-
-import akka.actor.ActorSystem
-import akka.testkit.TestKit
-import kamon.trace.TraceRecorder
-import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures }
-import org.scalatest.{ Matchers, OptionValues, WordSpecLike }
-import scalaz.concurrent.Future
-import java.util.concurrent.Executors
-
-class FutureInstrumentationSpec extends TestKit(ActorSystem("future-instrumentation-spec")) with WordSpecLike with Matchers
- with ScalaFutures with PatienceConfiguration with OptionValues {
-
- implicit val execContext = Executors.newCachedThreadPool()
-
- "a Future created with FutureTracing" should {
- "capture the TraceContext available when created" which {
- "must be available when executing the future's body" in {
-
- val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") {
- val future = Future(TraceRecorder.currentContext).start
-
- (future, TraceRecorder.currentContext)
- }
-
- val ctxInFuture = future.run
- ctxInFuture should equal(testTraceContext)
- }
-
- "must be available when executing callbacks on the future" in {
-
- val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") {
- val future = Future("Hello Kamon!")
- // The TraceContext is expected to be available during all intermediate processing.
- .map(_.length)
- .flatMap(len ⇒ Future(len.toString))
- .map(s ⇒ TraceRecorder.currentContext)
-
- (future.start, TraceRecorder.currentContext)
- }
-
- val ctxInFuture = future.run
- ctxInFuture should equal(testTraceContext)
- }
- }
- }
-}
-
diff --git a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
index 9144725e..40200685 100644
--- a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
@@ -1,128 +1,110 @@
package kamon.metric
import akka.actor._
-import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase }
+import akka.testkit.{ TestProbe, ImplicitSender }
import com.typesafe.config.ConfigFactory
-import kamon.Kamon
-import kamon.metric.Subscriptions.TickMetricSnapshot
-import org.scalatest.{ Matchers, WordSpecLike }
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.testkit.BaseKamonSpec
import scala.concurrent.duration._
-class SubscriptionsProtocolSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
- implicit lazy val system: ActorSystem = ActorSystem("subscriptions-protocol-spec", ConfigFactory.parseString(
- """
- |kamon.metrics {
- | tick-interval = 1 hour
- |}
- """.stripMargin))
+class SubscriptionsProtocolSpec extends BaseKamonSpec("subscriptions-protocol-spec") with ImplicitSender {
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon.metric {
+ | tick-interval = 1 hour
+ |}
+ """.stripMargin)
- val metricsExtension = Kamon(Metrics)(system)
- import metricsExtension.{ register, subscribe, unsubscribe }
+ val metricsModule = kamon.metrics
+ import metricsModule.{ register, subscribe, unsubscribe }
"the Subscriptions messaging protocol" should {
"allow subscribing for a single tick" in {
val subscriber = TestProbe()
- register(TraceMetrics("one-shot"), TraceMetrics.Factory)
- subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = false)
+ register(TraceMetrics, "one-shot")
+ subscribe("trace", "one-shot", subscriber.ref, permanently = false)
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
- tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot"))
+ tickSnapshot.metrics.keys should contain(Entity("one-shot", "trace"))
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
subscriber.expectNoMsg(1 second)
}
"allow subscribing permanently to a metric" in {
val subscriber = TestProbe()
- register(TraceMetrics("permanent"), TraceMetrics.Factory)
- subscribe(TraceMetrics, "permanent", subscriber.ref, permanently = true)
+ register(TraceMetrics, "permanent")
+ subscribe("trace", "permanent", subscriber.ref, permanently = true)
for (repetition ← 1 to 5) {
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
- tickSnapshot.metrics.keys should contain(TraceMetrics("permanent"))
- subscriber.expectNoMsg(1 second)
+ tickSnapshot.metrics.keys should contain(Entity("permanent", "trace"))
}
}
"allow subscribing to metrics matching a glob pattern" in {
val subscriber = TestProbe()
- register(TraceMetrics("include-one"), TraceMetrics.Factory)
- register(TraceMetrics("exclude-two"), TraceMetrics.Factory)
- register(TraceMetrics("include-three"), TraceMetrics.Factory)
- subscribe(TraceMetrics, "include-*", subscriber.ref, permanently = true)
+ register(TraceMetrics, "include-one")
+ register(TraceMetrics, "exclude-two")
+ register(TraceMetrics, "include-three")
+ subscribe("trace", "include-*", subscriber.ref, permanently = true)
for (repetition ← 1 to 5) {
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(2)
- tickSnapshot.metrics.keys should contain(TraceMetrics("include-one"))
- tickSnapshot.metrics.keys should contain(TraceMetrics("include-three"))
- subscriber.expectNoMsg(1 second)
+ tickSnapshot.metrics.keys should contain(Entity("include-one", "trace"))
+ tickSnapshot.metrics.keys should contain(Entity("include-three", "trace"))
}
}
"send a single TickMetricSnapshot to each subscriber, even if subscribed multiple times" in {
val subscriber = TestProbe()
- register(TraceMetrics("include-one"), TraceMetrics.Factory)
- register(TraceMetrics("exclude-two"), TraceMetrics.Factory)
- register(TraceMetrics("include-three"), TraceMetrics.Factory)
- subscribe(TraceMetrics, "include-one", subscriber.ref, permanently = true)
- subscribe(TraceMetrics, "include-three", subscriber.ref, permanently = true)
+ register(TraceMetrics, "include-one")
+ register(TraceMetrics, "exclude-two")
+ register(TraceMetrics, "include-three")
+ subscribe("trace", "include-one", subscriber.ref, permanently = true)
+ subscribe("trace", "include-three", subscriber.ref, permanently = true)
for (repetition ← 1 to 5) {
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(2)
- tickSnapshot.metrics.keys should contain(TraceMetrics("include-one"))
- tickSnapshot.metrics.keys should contain(TraceMetrics("include-three"))
+ tickSnapshot.metrics.keys should contain(Entity("include-one", "trace"))
+ tickSnapshot.metrics.keys should contain(Entity("include-three", "trace"))
}
}
"allow un-subscribing a subscriber" in {
val subscriber = TestProbe()
- register(TraceMetrics("one-shot"), TraceMetrics.Factory)
- subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = true)
+ register(TraceMetrics, "one-shot")
+ subscribe("trace", "one-shot", subscriber.ref, permanently = true)
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
- tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot"))
+ tickSnapshot.metrics.keys should contain(Entity("one-shot", "trace"))
unsubscribe(subscriber.ref)
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
subscriber.expectNoMsg(1 second)
}
+ }
- "watch all subscribers and un-subscribe them if they die" in {
- val subscriber = TestProbe()
- val forwarderSubscriber = system.actorOf(Props(new ForwarderSubscriber(subscriber.ref)))
- watch(forwarderSubscriber)
- register(TraceMetrics("one-shot"), TraceMetrics.Factory)
- subscribe(TraceMetrics, "one-shot", forwarderSubscriber, permanently = true)
-
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
- val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
- tickSnapshot.metrics.size should be(1)
- tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot"))
-
- forwarderSubscriber ! PoisonPill
- expectTerminated(forwarderSubscriber)
-
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
- subscriber.expectNoMsg(2 seconds)
- }
+ def subscriptionsActor: ActorRef = {
+ val listener = TestProbe()
+ system.actorSelection("/user/kamon/kamon-metrics").tell(Identify(1), listener.ref)
+ listener.expectMsgType[ActorIdentity].ref.get
}
}
diff --git a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
index a9197ab5..2e1f246d 100644
--- a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
@@ -17,32 +17,29 @@
package kamon.metric
import com.typesafe.config.ConfigFactory
-import kamon.{ MilliTimestamp, Kamon }
-import kamon.metric.instrument.Histogram
import kamon.metric.instrument.Histogram.MutableRecord
-import org.scalatest.{ Matchers, WordSpecLike }
-import akka.testkit.{ ImplicitSender, TestKitBase }
-import akka.actor.ActorSystem
+import kamon.testkit.BaseKamonSpec
+import kamon.util.MilliTimestamp
+import akka.testkit.ImplicitSender
import scala.concurrent.duration._
-import kamon.metric.Subscriptions.TickMetricSnapshot
-
-class TickMetricSnapshotBufferSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
- implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString(
- """
- |kamon.metrics {
- | tick-interval = 1 hour
- | default-collection-context-buffer-size = 10
- |
- | filters = [
- | {
- | trace {
- | includes = [ "*" ]
- | excludes = [ "non-tracked-trace"]
- | }
- | }
- | ]
- |}
- """.stripMargin))
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+
+class TickMetricSnapshotBufferSpec extends BaseKamonSpec("trace-metrics-spec") with ImplicitSender {
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon.metric {
+ | tick-interval = 1 hour
+ | default-collection-context-buffer-size = 10
+ |
+ | filters {
+ | trace {
+ | includes = [ "*" ]
+ | excludes = [ "non-tracked-trace" ]
+ | }
+ | }
+ |}
+ """.stripMargin)
"the TickMetricSnapshotBuffer" should {
"merge TickMetricSnapshots received until the flush timeout is reached and fix the from/to fields" in new SnapshotFixtures {
@@ -74,7 +71,7 @@ class TickMetricSnapshotBufferSpec extends TestKitBase with WordSpecLike with Ma
mergedSnapshot.to.millis should equal(4000)
mergedSnapshot.metrics should not be ('empty)
- val testMetricSnapshot = mergedSnapshot.metrics(testTraceIdentity).metrics(TraceMetrics.ElapsedTime).asInstanceOf[Histogram.Snapshot]
+ val testMetricSnapshot = mergedSnapshot.metrics(testTraceIdentity).histogram("elapsed-time").get
testMetricSnapshot.min should equal(10)
testMetricSnapshot.max should equal(300)
testMetricSnapshot.numberOfMeasurements should equal(6)
@@ -88,23 +85,23 @@ class TickMetricSnapshotBufferSpec extends TestKitBase with WordSpecLike with Ma
}
trait SnapshotFixtures {
- val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
- val testTraceIdentity = TraceMetrics("buffer-spec-test-trace")
- val traceRecorder = Kamon(Metrics).register(testTraceIdentity, TraceMetrics.Factory).get
+ val collectionContext = kamon.metrics.buildDefaultCollectionContext
+ val testTraceIdentity = Entity("buffer-spec-test-trace", "trace")
+ val traceRecorder = kamon.metrics.register(TraceMetrics, "buffer-spec-test-trace").get.recorder
val firstEmpty = TickMetricSnapshot(new MilliTimestamp(1000), new MilliTimestamp(2000), Map.empty)
val secondEmpty = TickMetricSnapshot(new MilliTimestamp(2000), new MilliTimestamp(3000), Map.empty)
val thirdEmpty = TickMetricSnapshot(new MilliTimestamp(3000), new MilliTimestamp(4000), Map.empty)
- traceRecorder.elapsedTime.record(10L)
- traceRecorder.elapsedTime.record(20L)
- traceRecorder.elapsedTime.record(30L)
+ traceRecorder.ElapsedTime.record(10L)
+ traceRecorder.ElapsedTime.record(20L)
+ traceRecorder.ElapsedTime.record(30L)
val firstNonEmpty = TickMetricSnapshot(new MilliTimestamp(1000), new MilliTimestamp(2000), Map(
(testTraceIdentity -> traceRecorder.collect(collectionContext))))
- traceRecorder.elapsedTime.record(10L)
- traceRecorder.elapsedTime.record(10L)
- traceRecorder.elapsedTime.record(300L)
+ traceRecorder.ElapsedTime.record(10L)
+ traceRecorder.ElapsedTime.record(10L)
+ traceRecorder.ElapsedTime.record(300L)
val secondNonEmpty = TickMetricSnapshot(new MilliTimestamp(1000), new MilliTimestamp(2000), Map(
(testTraceIdentity -> traceRecorder.collect(collectionContext))))
}
diff --git a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
index cd10f2d3..793c0112 100644
--- a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
@@ -1,92 +1,83 @@
package kamon.metric
-import akka.actor.ActorSystem
-import akka.testkit.{ ImplicitSender, TestKitBase }
+import akka.testkit.ImplicitSender
import com.typesafe.config.ConfigFactory
-import kamon.Kamon
-import kamon.metric.TraceMetrics.TraceMetricsSnapshot
-import kamon.trace.{ SegmentMetricIdentity, TraceRecorder }
-import org.scalatest.{ Matchers, WordSpecLike }
+import kamon.testkit.BaseKamonSpec
+import kamon.trace.TraceContext
+import kamon.metric.instrument.Histogram
-class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
- implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString(
- """
- |kamon.metrics {
- | tick-interval = 1 hour
- | default-collection-context-buffer-size = 10
- |
- | filters = [
- | {
- | trace {
- | includes = [ "*" ]
- | excludes = [ "non-tracked-trace"]
- | }
- | }
- | ]
- | precision {
- | default-histogram-precision {
- | highest-trackable-value = 3600000000000
- | significant-value-digits = 2
- | }
- |
- | default-min-max-counter-precision {
- | refresh-interval = 1 second
- | highest-trackable-value = 999999999
- | significant-value-digits = 2
- | }
- | }
- |}
- """.stripMargin))
+class TraceMetricsSpec extends BaseKamonSpec("trace-metrics-spec") with ImplicitSender {
+ import TraceMetricsSpec.SegmentSyntax
+
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon.metric {
+ | tick-interval = 1 hour
+ | default-collection-context-buffer-size = 10
+ |
+ | filters {
+ | trace {
+ | includes = [ "*" ]
+ | excludes = [ "non-tracked-trace"]
+ | }
+ | }
+ |}
+ """.stripMargin)
"the TraceMetrics" should {
"record the elapsed time between a trace creation and finish" in {
for (repetitions ← 1 to 10) {
- TraceRecorder.withNewTraceContext("record-elapsed-time") {
- TraceRecorder.finish()
+ TraceContext.withContext(newContext("record-elapsed-time")) {
+ TraceContext.currentContext.finish()
}
}
- val snapshot = takeSnapshotOf("record-elapsed-time")
- snapshot.elapsedTime.numberOfMeasurements should be(10)
- snapshot.segments shouldBe empty
+ val snapshot = takeSnapshotOf("record-elapsed-time", "trace")
+ snapshot.histogram("elapsed-time").get.numberOfMeasurements should be(10)
}
"record the elapsed time for segments that occur inside a given trace" in {
- TraceRecorder.withNewTraceContext("trace-with-segments") {
- val segment = TraceRecorder.currentContext.startSegment("test-segment", "test-category", "test-library")
+ TraceContext.withContext(newContext("trace-with-segments")) {
+ val segment = TraceContext.currentContext.startSegment("test-segment", "test-category", "test-library")
segment.finish()
- TraceRecorder.finish()
+ TraceContext.currentContext.finish()
}
- val snapshot = takeSnapshotOf("trace-with-segments")
- snapshot.elapsedTime.numberOfMeasurements should be(1)
+ val snapshot = takeSnapshotOf("trace-with-segments", "trace")
+ snapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
snapshot.segments.size should be(1)
- snapshot.segments(SegmentMetricIdentity("test-segment", "test-category", "test-library")).numberOfMeasurements should be(1)
+ snapshot.segment("test-segment", "test-category", "test-library").numberOfMeasurements should be(1)
}
"record the elapsed time for segments that finish after their correspondent trace has finished" in {
- val segment = TraceRecorder.withNewTraceContext("closing-segment-after-trace") {
- val s = TraceRecorder.currentContext.startSegment("test-segment", "test-category", "test-library")
- TraceRecorder.finish()
+ val segment = TraceContext.withContext(newContext("closing-segment-after-trace")) {
+ val s = TraceContext.currentContext.startSegment("test-segment", "test-category", "test-library")
+ TraceContext.currentContext.finish()
s
}
- val beforeFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace")
- beforeFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(1)
+ val beforeFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace", "trace")
+ beforeFinishSegmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
beforeFinishSegmentSnapshot.segments.size should be(0)
segment.finish()
- val afterFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace")
- afterFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(0)
+ val afterFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace", "trace")
+ afterFinishSegmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(0)
afterFinishSegmentSnapshot.segments.size should be(1)
- afterFinishSegmentSnapshot.segments(SegmentMetricIdentity("test-segment", "test-category", "test-library")).numberOfMeasurements should be(1)
+ afterFinishSegmentSnapshot.segment("test-segment", "test-category", "test-library").numberOfMeasurements should be(1)
}
}
+}
+
+object TraceMetricsSpec {
+ implicit class SegmentSyntax(val entitySnapshot: EntitySnapshot) extends AnyVal {
+ def segments: Map[HistogramKey, Histogram.Snapshot] = {
+ entitySnapshot.histograms.filterKeys(_.metadata.contains("category"))
+ }
- def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = {
- val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory)
- val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
- recorder.get.collect(collectionContext)
+ def segment(name: String, category: String, library: String): Histogram.Snapshot =
+ segments(TraceMetrics.segmentKey(name, category, library))
}
}
diff --git a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
index 6c4fe3fb..a345c6a9 100644
--- a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
@@ -1,310 +1,110 @@
package kamon.metric
-import akka.actor.{ Props, ActorSystem }
-import akka.testkit.{ ImplicitSender, TestKitBase }
import com.typesafe.config.ConfigFactory
-import kamon.Kamon
-import kamon.metric.Subscriptions.TickMetricSnapshot
-import kamon.metric.UserMetrics._
-import kamon.metric.instrument.{ Histogram, Counter, MinMaxCounter, Gauge }
-import kamon.metric.instrument.Histogram.MutableRecord
-import org.scalatest.{ Matchers, WordSpecLike }
+import kamon.metric.instrument.Histogram.DynamicRange
+import kamon.testkit.BaseKamonSpec
import scala.concurrent.duration._
-class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
- implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString(
- """
- |kamon.metrics {
- | tick-interval = 1 hour
- | default-collection-context-buffer-size = 10
- |
- | precision {
- | default-histogram-precision {
- | highest-trackable-value = 10000
- | significant-value-digits = 2
- | }
- |
- | default-min-max-counter-precision {
- | refresh-interval = 1 hour
- | highest-trackable-value = 1000
- | significant-value-digits = 2
- | }
- |
- | default-gauge-precision {
- | refresh-interval = 1 hour
- | highest-trackable-value = 999999999
- | significant-value-digits = 2
- | }
- | }
- |}
- """.stripMargin))
+class UserMetricsSpec extends BaseKamonSpec("user-metrics-spec") {
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon.metric {
+ | tick-interval = 1 hour
+ | default-collection-context-buffer-size = 10
+ |}
+ """.stripMargin)
"the UserMetrics extension" should {
+
"allow registering a fully configured Histogram and get the same Histogram if registering again" in {
- val histogramA = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L)
- val histogramB = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L)
+ val histogramA = kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 10000, 2))
+ val histogramB = kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 10000, 2))
histogramA shouldBe theSameInstanceAs(histogramB)
}
"return the original Histogram when registering a fully configured Histogram for second time but with different settings" in {
- val histogramA = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L)
- val histogramB = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Fine, 50000L)
+ val histogramA = kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 10000, 2))
+ val histogramB = kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 50000, 2))
histogramA shouldBe theSameInstanceAs(histogramB)
}
"allow registering a Histogram that takes the default configuration from the kamon.metrics.precision settings" in {
- Kamon(UserMetrics).registerHistogram("histogram-with-default-configuration")
+ kamon.userMetrics.histogram("histogram-with-default-configuration")
}
"allow registering a Counter and get the same Counter if registering again" in {
- val counterA = Kamon(UserMetrics).registerCounter("counter")
- val counterB = Kamon(UserMetrics).registerCounter("counter")
+ val counterA = kamon.userMetrics.counter("counter")
+ val counterB = kamon.userMetrics.counter("counter")
counterA shouldBe theSameInstanceAs(counterB)
}
"allow registering a fully configured MinMaxCounter and get the same MinMaxCounter if registering again" in {
- val minMaxCounterA = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second)
- val minMaxCounterB = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second)
+ val minMaxCounterA = kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 10000, 2), 1 second)
+ val minMaxCounterB = kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 10000, 2), 1 second)
minMaxCounterA shouldBe theSameInstanceAs(minMaxCounterB)
}
"return the original MinMaxCounter when registering a fully configured MinMaxCounter for second time but with different settings" in {
- val minMaxCounterA = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second)
- val minMaxCounterB = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Fine, 5000L, 1 second)
+ val minMaxCounterA = kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 10000, 2), 1 second)
+ val minMaxCounterB = kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 50000, 2), 1 second)
minMaxCounterA shouldBe theSameInstanceAs(minMaxCounterB)
}
"allow registering a MinMaxCounter that takes the default configuration from the kamon.metrics.precision settings" in {
- Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-default-configuration")
+ kamon.userMetrics.minMaxCounter("min-max-counter-with-default-configuration")
}
"allow registering a fully configured Gauge and get the same Gauge if registering again" in {
- val gaugeA = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Normal, 1000L, 1 second) {
+ val gaugeA = kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, {
() ⇒ 1L
- }
+ })
- val gaugeB = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Normal, 1000L, 1 second) {
+ val gaugeB = kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, {
() ⇒ 1L
- }
+ })
gaugeA shouldBe theSameInstanceAs(gaugeB)
}
"return the original Gauge when registering a fully configured Gauge for second time but with different settings" in {
- val gaugeA = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Normal, 1000L, 1 second) {
+ val gaugeA = kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, {
() ⇒ 1L
- }
+ })
- val gaugeB = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Fine, 5000L, 1 second) {
+ val gaugeB = kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, {
() ⇒ 1L
- }
+ })
gaugeA shouldBe theSameInstanceAs(gaugeB)
}
"allow registering a Gauge that takes the default configuration from the kamon.metrics.precision settings" in {
- Kamon(UserMetrics).registerGauge("gauge-with-default-configuration") {
+ kamon.userMetrics.gauge("gauge-with-default-configuration", {
() ⇒ 2L
- }
+ })
}
"allow un-registering user metrics" in {
- val metricsExtension = Kamon(Metrics)
- Kamon(UserMetrics).registerCounter("counter-for-remove")
- Kamon(UserMetrics).registerHistogram("histogram-for-remove")
- Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-for-remove")
- Kamon(UserMetrics).registerGauge("gauge-for-remove") { () ⇒ 2L }
-
- metricsExtension.storage.keys should contain(UserCounter("counter-for-remove"))
- metricsExtension.storage.keys should contain(UserHistogram("histogram-for-remove"))
- metricsExtension.storage.keys should contain(UserMinMaxCounter("min-max-counter-for-remove"))
- metricsExtension.storage.keys should contain(UserGauge("gauge-for-remove"))
-
- Kamon(UserMetrics).removeCounter("counter-for-remove")
- Kamon(UserMetrics).removeHistogram("histogram-for-remove")
- Kamon(UserMetrics).removeMinMaxCounter("min-max-counter-for-remove")
- Kamon(UserMetrics).removeGauge("gauge-for-remove")
-
- metricsExtension.storage.keys should not contain (UserCounter("counter-for-remove"))
- metricsExtension.storage.keys should not contain (UserHistogram("histogram-for-remove"))
- metricsExtension.storage.keys should not contain (UserMinMaxCounter("min-max-counter-for-remove"))
- metricsExtension.storage.keys should not contain (UserGauge("gauge-for-remove"))
- }
-
- "include all the registered metrics in the a tick snapshot and reset all recorders" in {
- Kamon(Metrics).subscribe(UserHistograms, "*", testActor, permanently = true)
- Kamon(Metrics).subscribe(UserCounters, "*", testActor, permanently = true)
- Kamon(Metrics).subscribe(UserMinMaxCounters, "*", testActor, permanently = true)
- Kamon(Metrics).subscribe(UserGauges, "*", testActor, permanently = true)
-
- val histogramWithSettings = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L)
- val histogramWithDefaultConfiguration = Kamon(UserMetrics).registerHistogram("histogram-with-default-configuration")
- val counter = Kamon(UserMetrics).registerCounter("counter")
- val minMaxCounterWithSettings = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second)
- val gauge = Kamon(UserMetrics).registerGauge("gauge-with-default-configuration") { () ⇒ 2L }
-
- // lets put some values on those metrics
- histogramWithSettings.record(10)
- histogramWithSettings.record(20, 100)
- histogramWithDefaultConfiguration.record(40)
-
- counter.increment()
- counter.increment(16)
-
- minMaxCounterWithSettings.increment(43)
- minMaxCounterWithSettings.decrement()
-
- gauge.record(15)
-
- Kamon(Metrics).subscriptions ! Subscriptions.FlushMetrics
- val firstSnapshot = expectMsgType[TickMetricSnapshot].metrics
-
- firstSnapshot.keys should contain allOf (
- UserHistogram("histogram-with-settings"),
- UserHistogram("histogram-with-default-configuration"))
-
- firstSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (10)
- firstSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (20)
- firstSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(101)
- firstSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain allOf (
- MutableRecord(10, 1),
- MutableRecord(20, 100))
-
- firstSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (40)
- firstSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (40)
- firstSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1)
- firstSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain only (
- MutableRecord(40, 1))
-
- firstSnapshot(UserCounter("counter")).metrics(Count).asInstanceOf[Counter.Snapshot].count should be(17)
-
- firstSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0)
- firstSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (43)
- firstSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(3)
- firstSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain allOf (
- MutableRecord(0, 1), // min
- MutableRecord(42, 1), // current
- MutableRecord(43, 1)) // max
-
- firstSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0)
- firstSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (0)
- firstSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(3)
- firstSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain only (
- MutableRecord(0, 3)) // min, max and current
-
- firstSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (15)
- firstSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (15)
- firstSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1)
- firstSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain only (
- MutableRecord(15, 1)) // only the manually recorded value
-
- Kamon(Metrics).subscriptions ! Subscriptions.FlushMetrics
- val secondSnapshot = expectMsgType[TickMetricSnapshot].metrics
-
- secondSnapshot.keys should contain allOf (
- UserHistogram("histogram-with-settings"),
- UserHistogram("histogram-with-default-configuration"))
-
- secondSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0)
- secondSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (0)
- secondSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(0)
- secondSnapshot(UserHistogram("histogram-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream shouldBe empty
-
- secondSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0)
- secondSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (0)
- secondSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(0)
- secondSnapshot(UserHistogram("histogram-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream shouldBe empty
-
- secondSnapshot(UserCounter("counter")).metrics(Count).asInstanceOf[Counter.Snapshot].count should be(0)
-
- secondSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (42)
- secondSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (42)
- secondSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(3)
- secondSnapshot(UserMinMaxCounter("min-max-counter-with-settings")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain only (
- MutableRecord(42, 3)) // max
-
- secondSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0)
- secondSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (0)
- secondSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(3)
- secondSnapshot(UserMinMaxCounter("min-max-counter-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain only (
- MutableRecord(0, 3)) // min, max and current
-
- secondSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0)
- secondSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (0)
- secondSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(0)
- secondSnapshot(UserGauge("gauge-with-default-configuration")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream shouldBe empty
-
- Kamon(Metrics).unsubscribe(testActor)
- }
-
- "generate a snapshot that can be merged with another" in {
- val buffer = system.actorOf(TickMetricSnapshotBuffer.props(1 hours, testActor))
- Kamon(Metrics).subscribe(UserHistograms, "*", buffer, permanently = true)
- Kamon(Metrics).subscribe(UserCounters, "*", buffer, permanently = true)
- Kamon(Metrics).subscribe(UserMinMaxCounters, "*", buffer, permanently = true)
- Kamon(Metrics).subscribe(UserGauges, "*", buffer, permanently = true)
-
- val histogram = Kamon(UserMetrics).registerHistogram("histogram-for-merge")
- val counter = Kamon(UserMetrics).registerCounter("counter-for-merge")
- val minMaxCounter = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-for-merge")
- val gauge = Kamon(UserMetrics).registerGauge("gauge-for-merge") { () ⇒ 10L }
-
- histogram.record(100)
- counter.increment(10)
- minMaxCounter.increment(50)
- minMaxCounter.decrement(10)
- gauge.record(50)
-
- Kamon(Metrics).subscriptions ! Subscriptions.FlushMetrics
- Thread.sleep(2000) // Make sure that the snapshots are taken before proceeding
-
- val extraCounter = Kamon(UserMetrics).registerCounter("extra-counter")
- histogram.record(200)
- extraCounter.increment(20)
- minMaxCounter.increment(40)
- minMaxCounter.decrement(50)
- gauge.record(70)
-
- Kamon(Metrics).subscriptions ! Subscriptions.FlushMetrics
- Thread.sleep(2000) // Make sure that the metrics are buffered.
- buffer ! TickMetricSnapshotBuffer.FlushBuffer
- val snapshot = expectMsgType[TickMetricSnapshot].metrics
-
- snapshot.keys should contain(UserHistogram("histogram-for-merge"))
-
- snapshot(UserHistogram("histogram-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (100)
- snapshot(UserHistogram("histogram-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (200)
- snapshot(UserHistogram("histogram-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(2)
- snapshot(UserHistogram("histogram-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain allOf (
- MutableRecord(100, 1),
- MutableRecord(200, 1))
-
- snapshot(UserCounter("counter-for-merge")).metrics(Count).asInstanceOf[Counter.Snapshot].count should be(10)
- snapshot(UserCounter("extra-counter")).metrics(Count).asInstanceOf[Counter.Snapshot].count should be(20)
-
- snapshot(UserMinMaxCounter("min-max-counter-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (0)
- snapshot(UserMinMaxCounter("min-max-counter-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (80)
- snapshot(UserMinMaxCounter("min-max-counter-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(6)
- snapshot(UserMinMaxCounter("min-max-counter-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain allOf (
- MutableRecord(0, 1), // min in first snapshot
- MutableRecord(30, 2), // min and current in second snapshot
- MutableRecord(40, 1), // current in first snapshot
- MutableRecord(50, 1), // max in first snapshot
- MutableRecord(80, 1)) // max in second snapshot
-
- snapshot(UserGauge("gauge-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].min shouldBe (50)
- snapshot(UserGauge("gauge-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].max shouldBe (70)
- snapshot(UserGauge("gauge-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(2)
- snapshot(UserGauge("gauge-for-merge")).metrics(RecordedValues).asInstanceOf[Histogram.Snapshot].recordsIterator.toStream should contain allOf (
- MutableRecord(50, 1),
- MutableRecord(70, 1))
-
- Kamon(Metrics).unsubscribe(testActor)
+ val counter = kamon.userMetrics.counter("counter-for-remove")
+ val histogram = kamon.userMetrics.histogram("histogram-for-remove")
+ val minMaxCounter = kamon.userMetrics.minMaxCounter("min-max-counter-for-remove")
+ val gauge = kamon.userMetrics.gauge("gauge-for-remove", { () ⇒ 2L })
+
+ kamon.userMetrics.removeCounter("counter-for-remove")
+ kamon.userMetrics.removeHistogram("histogram-for-remove")
+ kamon.userMetrics.removeMinMaxCounter("min-max-counter-for-remove")
+ kamon.userMetrics.removeGauge("gauge-for-remove")
+
+ counter should not be (theSameInstanceAs(kamon.userMetrics.counter("counter-for-remove")))
+ histogram should not be (theSameInstanceAs(kamon.userMetrics.histogram("histogram-for-remove")))
+ minMaxCounter should not be (theSameInstanceAs(kamon.userMetrics.minMaxCounter("min-max-counter-for-remove")))
+ gauge should not be (theSameInstanceAs(kamon.userMetrics.gauge("gauge-for-remove", { () ⇒ 2L })))
}
}
}
diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala
index 1a93e1f6..500a69c5 100644
--- a/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala
@@ -2,7 +2,6 @@ package kamon.metric.instrument
import java.nio.LongBuffer
-import kamon.metric.CollectionContext
import org.scalatest.{ Matchers, WordSpec }
class CounterSpec extends WordSpec with Matchers {
diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala
index 9192d999..bd39652c 100644
--- a/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala
@@ -1,72 +1,62 @@
package kamon.metric.instrument
import java.util.concurrent.atomic.AtomicLong
-
-import akka.actor.ActorSystem
-import com.typesafe.config.ConfigFactory
-import kamon.Kamon
-import kamon.metric.{ Metrics, Scale, CollectionContext }
-import org.scalatest.{ Matchers, WordSpecLike }
+import kamon.metric.instrument.Histogram.DynamicRange
+import kamon.testkit.BaseKamonSpec
import scala.concurrent.duration._
-class GaugeSpec extends WordSpecLike with Matchers {
- implicit val system = ActorSystem("gauge-spec", ConfigFactory.parseString(
- """
- |kamon.metrics {
- | flush-interval = 1 hour
- | default-collection-context-buffer-size = 10
- | precision {
- | default-gauge-precision {
- | refresh-interval = 100 milliseconds
- | highest-trackable-value = 999999999
- | significant-value-digits = 2
- | }
- | }
- |}
- """.stripMargin))
+class GaugeSpec extends BaseKamonSpec("gauge-spec") {
"a Gauge" should {
- "automatically record the current value using the configured refresh-interval" in {
- val numberOfValuesRecorded = new AtomicLong(0)
- val gauge = Gauge.fromDefaultConfig(system) { () ⇒ numberOfValuesRecorded.addAndGet(1) }
-
+ "automatically record the current value using the configured refresh-interval" in new GaugeFixture {
+ val (numberOfValuesRecorded, gauge) = createGauge()
Thread.sleep(1.second.toMillis)
+
numberOfValuesRecorded.get() should be(10L +- 1L)
gauge.cleanup
}
- "stop automatically recording after a call to cleanup" in {
- val numberOfValuesRecorded = new AtomicLong(0)
- val gauge = Gauge.fromDefaultConfig(system) { () ⇒ numberOfValuesRecorded.addAndGet(1) }
-
+ "stop automatically recording after a call to cleanup" in new GaugeFixture {
+ val (numberOfValuesRecorded, gauge) = createGauge()
Thread.sleep(1.second.toMillis)
+
gauge.cleanup
numberOfValuesRecorded.get() should be(10L +- 1L)
Thread.sleep(1.second.toMillis)
+
numberOfValuesRecorded.get() should be(10L +- 1L)
}
- "produce a Histogram snapshot including all the recorded values" in {
- val numberOfValuesRecorded = new AtomicLong(0)
- val gauge = Gauge.fromDefaultConfig(system) { () ⇒ numberOfValuesRecorded.addAndGet(1) }
+ "produce a Histogram snapshot including all the recorded values" in new GaugeFixture {
+ val (numberOfValuesRecorded, gauge) = createGauge()
Thread.sleep(1.second.toMillis)
gauge.cleanup
- val snapshot = gauge.collect(Kamon(Metrics).buildDefaultCollectionContext)
+ val snapshot = gauge.collect(kamon.metrics.buildDefaultCollectionContext)
snapshot.numberOfMeasurements should be(10L +- 1L)
snapshot.min should be(1)
snapshot.max should be(10L +- 1L)
}
- "not record the current value when doing a collection" in {
- val numberOfValuesRecorded = new AtomicLong(0)
- val gauge = Gauge(Histogram.Precision.Normal, 10000L, Scale.Unit, 1 hour, system)(() ⇒ numberOfValuesRecorded.addAndGet(1))
-
- val snapshot = gauge.collect(Kamon(Metrics).buildDefaultCollectionContext)
+ "not record the current value when doing a collection" in new GaugeFixture {
+ val (numberOfValuesRecorded, gauge) = createGauge(10 seconds)
+ val snapshot = gauge.collect(kamon.metrics.buildDefaultCollectionContext)
snapshot.numberOfMeasurements should be(0)
numberOfValuesRecorded.get() should be(0)
}
}
+
+ trait GaugeFixture {
+ def createGauge(refreshInterval: FiniteDuration = 100 millis): (AtomicLong, Gauge) = {
+ val recordedValuesCounter = new AtomicLong(0)
+ val gauge = Gauge(DynamicRange(1, 100, 2), refreshInterval, kamon.metrics.settings.refreshScheduler, {
+ () ⇒ recordedValuesCounter.addAndGet(1)
+ })
+
+ (recordedValuesCounter, gauge)
+ }
+
+ }
}
diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala
index c3060d4a..9a50e149 100644
--- a/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala
@@ -18,22 +18,13 @@ package kamon.metric.instrument
import java.nio.LongBuffer
-import com.typesafe.config.ConfigFactory
-import kamon.metric.CollectionContext
+import kamon.metric.instrument.Histogram.DynamicRange
import org.scalatest.{ Matchers, WordSpec }
import scala.util.Random
class HistogramSpec extends WordSpec with Matchers {
- val histogramConfig = ConfigFactory.parseString(
- """
- |
- |highest-trackable-value = 100000
- |significant-value-digits = 2
- |
- """.stripMargin)
-
"a Histogram" should {
"allow record values within the configured range" in new HistogramFixture {
histogram.record(1000)
@@ -109,7 +100,7 @@ class HistogramSpec extends WordSpec with Matchers {
val buffer: LongBuffer = LongBuffer.allocate(10000)
}
- val histogram = Histogram.fromConfig(histogramConfig)
+ val histogram = Histogram(DynamicRange(1, 100000, 2))
def takeSnapshot(): Histogram.Snapshot = histogram.collect(collectionContext)
}
@@ -119,17 +110,20 @@ class HistogramSpec extends WordSpec with Matchers {
val buffer: LongBuffer = LongBuffer.allocate(10000)
}
- val controlHistogram = Histogram.fromConfig(histogramConfig)
- val histogramA = Histogram.fromConfig(histogramConfig)
- val histogramB = Histogram.fromConfig(histogramConfig)
+ val controlHistogram = Histogram(DynamicRange(1, 100000, 2))
+ val histogramA = Histogram(DynamicRange(1, 100000, 2))
+ val histogramB = Histogram(DynamicRange(1, 100000, 2))
+
+ def takeSnapshotFrom(histogram: Histogram): InstrumentSnapshot = histogram.collect(collectionContext)
- def takeSnapshotFrom(histogram: Histogram): Histogram.Snapshot = histogram.collect(collectionContext)
+ def assertEquals(left: InstrumentSnapshot, right: InstrumentSnapshot): Unit = {
+ val leftSnapshot = left.asInstanceOf[Histogram.Snapshot]
+ val rightSnapshot = right.asInstanceOf[Histogram.Snapshot]
- def assertEquals(left: Histogram.Snapshot, right: Histogram.Snapshot): Unit = {
- left.numberOfMeasurements should equal(right.numberOfMeasurements)
- left.min should equal(right.min)
- left.max should equal(right.max)
- left.recordsIterator.toStream should contain theSameElementsAs (right.recordsIterator.toStream)
+ leftSnapshot.numberOfMeasurements should equal(rightSnapshot.numberOfMeasurements)
+ leftSnapshot.min should equal(rightSnapshot.min)
+ leftSnapshot.max should equal(rightSnapshot.max)
+ leftSnapshot.recordsIterator.toStream should contain theSameElementsAs (rightSnapshot.recordsIterator.toStream)
}
}
}
diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala
index 2c11adc3..7a3d7aa3 100644
--- a/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala
@@ -19,19 +19,11 @@ import java.nio.LongBuffer
import akka.actor._
import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
-import kamon.metric.CollectionContext
-import kamon.metric.instrument.Histogram.MutableRecord
-import org.scalatest.{ Matchers, WordSpecLike }
-
-class MinMaxCounterSpec extends WordSpecLike with Matchers {
- implicit val system = ActorSystem("min-max-counter-spec")
- val minMaxCounterConfig = ConfigFactory.parseString(
- """
- |refresh-interval = 1 hour
- |highest-trackable-value = 1000
- |significant-value-digits = 2
- """.stripMargin)
+import kamon.metric.instrument.Histogram.{ DynamicRange, MutableRecord }
+import kamon.testkit.BaseKamonSpec
+import scala.concurrent.duration._
+
+class MinMaxCounterSpec extends BaseKamonSpec("min-max-counter-spec") {
"the MinMaxCounter" should {
"track ascending tendencies" in new MinMaxCounterFixture {
@@ -104,7 +96,7 @@ class MinMaxCounterSpec extends WordSpecLike with Matchers {
workers foreach (_ ! "increment")
for (refresh ← 1 to 1000) {
collectCounterSnapshot()
- Thread.sleep(10)
+ Thread.sleep(1)
}
monitor.expectNoMsg()
@@ -117,7 +109,7 @@ class MinMaxCounterSpec extends WordSpecLike with Matchers {
val buffer: LongBuffer = LongBuffer.allocate(64)
}
- val mmCounter = MinMaxCounter.fromConfig(minMaxCounterConfig, system).asInstanceOf[PaddedMinMaxCounter]
+ val mmCounter = MinMaxCounter(DynamicRange(1, 1000, 2), 1 hour, kamon.metrics.settings.refreshScheduler)
mmCounter.cleanup // cancel the refresh schedule
def collectCounterSnapshot(): Histogram.Snapshot = mmCounter.collect(collectionContext)
diff --git a/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala b/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala
new file mode 100644
index 00000000..20fc3ed5
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala
@@ -0,0 +1,34 @@
+package kamon.testkit
+
+import akka.testkit.{ ImplicitSender, TestKitBase }
+import akka.actor.ActorSystem
+import com.typesafe.config.{ Config, ConfigFactory }
+import kamon.Kamon
+import kamon.metric.{ SubscriptionsDispatcher, EntitySnapshot, MetricsExtensionImpl }
+import kamon.trace.TraceContext
+import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
+
+abstract class BaseKamonSpec(actorSystemName: String) extends TestKitBase with WordSpecLike with Matchers with ImplicitSender with BeforeAndAfterAll {
+ lazy val kamon = Kamon(actorSystemName, config)
+ lazy val collectionContext = kamon.metrics.buildDefaultCollectionContext
+ implicit lazy val system: ActorSystem = kamon.actorSystem
+
+ def config: Config =
+ ConfigFactory.load()
+
+ def newContext(name: String): TraceContext =
+ kamon.tracer.newContext(name)
+
+ def newContext(name: String, token: String): TraceContext =
+ kamon.tracer.newContext(name, token)
+
+ def takeSnapshotOf(name: String, category: String): EntitySnapshot = {
+ val recorder = kamon.metrics.find(name, category).get
+ recorder.collect(collectionContext)
+ }
+
+ def flushSubscriptions(): Unit =
+ system.actorSelection("/user/kamon/subscriptions-dispatcher") ! SubscriptionsDispatcher.Tick
+
+ override protected def afterAll(): Unit = system.shutdown()
+}
diff --git a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
index cda9cad7..0cb4ce34 100644
--- a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
@@ -16,58 +16,40 @@
package kamon.trace
-import akka.actor.ActorSystem
-import akka.testkit.{ ImplicitSender, TestKitBase }
import com.typesafe.config.ConfigFactory
import kamon.Kamon
-import org.scalatest.{ Matchers, WordSpecLike }
+import kamon.testkit.BaseKamonSpec
import scala.concurrent.duration._
-class SimpleTraceSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
- implicit lazy val system: ActorSystem = ActorSystem("simple-trace-spec", ConfigFactory.parseString(
- """
- |kamon.metrics {
- | tick-interval = 1 hour
- | filters = [
- | {
- | trace {
- | includes = [ "*" ]
- | excludes = [ "non-tracked-trace"]
- | }
- | }
- | ]
- | precision {
- | default-histogram-precision {
- | highest-trackable-value = 3600000000000
- | significant-value-digits = 2
- | }
- |
- | default-min-max-counter-precision {
- | refresh-interval = 1 second
- | highest-trackable-value = 999999999
- | significant-value-digits = 2
- | }
- | }
- |}
- |
- |kamon.trace {
- | level = simple-trace
- | sampling = all
- |}
- """.stripMargin))
+class SimpleTraceSpec extends BaseKamonSpec("simple-trace-spec") {
+
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon {
+ | metric {
+ | tick-interval = 1 hour
+ | }
+ |
+ | trace {
+ | level-of-detail = simple-trace
+ | sampling = all
+ | }
+ |}
+ """.stripMargin)
"the simple tracing" should {
"send a TraceInfo when the trace has finished and all segments are finished" in {
- Kamon(Trace)(system).subscribe(testActor)
+ Kamon(Tracer)(system).subscribe(testActor)
- TraceRecorder.withNewTraceContext("simple-trace-without-segments") {
- TraceRecorder.currentContext.startSegment("segment-one", "test-segment", "test").finish()
- TraceRecorder.currentContext.startSegment("segment-two", "test-segment", "test").finish()
- TraceRecorder.finish()
+ TraceContext.withContext(newContext("simple-trace-without-segments")) {
+ TraceContext.currentContext.startSegment("segment-one", "test-segment", "test").finish()
+ TraceContext.currentContext.startSegment("segment-two", "test-segment", "test").finish()
+ TraceContext.currentContext.finish()
}
val traceInfo = expectMsgType[TraceInfo]
- Kamon(Trace)(system).unsubscribe(testActor)
+ Kamon(Tracer)(system).unsubscribe(testActor)
traceInfo.name should be("simple-trace-without-segments")
traceInfo.segments.size should be(2)
@@ -76,12 +58,12 @@ class SimpleTraceSpec extends TestKitBase with WordSpecLike with Matchers with I
}
"incubate the tracing context if there are open segments after finishing" in {
- Kamon(Trace)(system).subscribe(testActor)
+ Kamon(Tracer)(system).subscribe(testActor)
- val secondSegment = TraceRecorder.withNewTraceContext("simple-trace-without-segments") {
- TraceRecorder.currentContext.startSegment("segment-one", "test-segment", "test").finish()
- val segment = TraceRecorder.currentContext.startSegment("segment-two", "test-segment", "test")
- TraceRecorder.finish()
+ val secondSegment = TraceContext.withContext(newContext("simple-trace-without-segments")) {
+ TraceContext.currentContext.startSegment("segment-one", "test-segment", "test").finish()
+ val segment = TraceContext.currentContext.startSegment("segment-two", "test-segment", "test")
+ TraceContext.currentContext.finish()
segment
}
@@ -90,7 +72,7 @@ class SimpleTraceSpec extends TestKitBase with WordSpecLike with Matchers with I
within(10 seconds) {
val traceInfo = expectMsgType[TraceInfo]
- Kamon(Trace)(system).unsubscribe(testActor)
+ Kamon(Tracer)(system).unsubscribe(testActor)
traceInfo.name should be("simple-trace-without-segments")
traceInfo.segments.size should be(2)
diff --git a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala
index 0875deff..9d7725b7 100644
--- a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala
@@ -1,94 +1,80 @@
package kamon.trace
-import akka.actor.ActorSystem
-import akka.testkit.{ ImplicitSender, TestKitBase }
import com.typesafe.config.ConfigFactory
-import org.scalatest.{ Matchers, WordSpecLike }
+import kamon.testkit.BaseKamonSpec
-class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
- implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString(
- """
- |kamon.metrics {
- | tick-interval = 1 hour
- | filters = [
- | {
- | trace {
- | includes = [ "*" ]
- | excludes = [ "non-tracked-trace"]
- | }
- | }
- | ]
- | precision {
- | default-histogram-precision {
- | highest-trackable-value = 3600000000000
- | significant-value-digits = 2
- | }
- |
- | default-min-max-counter-precision {
- | refresh-interval = 1 second
- | highest-trackable-value = 999999999
- | significant-value-digits = 2
- | }
- | }
- |}
- """.stripMargin))
+class TraceContextManipulationSpec extends BaseKamonSpec("trace-metrics-spec") {
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon.metric {
+ | tick-interval = 1 hour
+ |
+ | filters {
+ | trace {
+ | includes = [ "*" ]
+ | excludes = [ "non-tracked-trace"]
+ | }
+ | }
+ |}
+ """.stripMargin)
- "the TraceRecorder api" should {
+ "the TraceContext api" should {
"allow starting a trace within a specified block of code, and only within that block of code" in {
- val createdContext = TraceRecorder.withNewTraceContext("start-context") {
- TraceRecorder.currentContext should not be empty
- TraceRecorder.currentContext
+ val createdContext = TraceContext.withContext(newContext("start-context")) {
+ TraceContext.currentContext should not be empty
+ TraceContext.currentContext
}
- TraceRecorder.currentContext shouldBe empty
+ TraceContext.currentContext shouldBe empty
createdContext.name shouldBe ("start-context")
}
"allow starting a trace within a specified block of code, providing a trace-token and only within that block of code" in {
- val createdContext = TraceRecorder.withNewTraceContext("start-context-with-token", Some("token-1")) {
- TraceRecorder.currentContext should not be empty
- TraceRecorder.currentContext
+ val createdContext = TraceContext.withContext(newContext("start-context-with-token", "token-1")) {
+ TraceContext.currentContext should not be empty
+ TraceContext.currentContext
}
- TraceRecorder.currentContext shouldBe empty
+ TraceContext.currentContext shouldBe empty
createdContext.name shouldBe ("start-context-with-token")
createdContext.token should be("token-1")
}
"allow providing a TraceContext and make it available within a block of code" in {
- val createdContext = TraceRecorder.withNewTraceContext("manually-provided-trace-context") { TraceRecorder.currentContext }
+ val createdContext = newContext("manually-provided-trace-context")
- TraceRecorder.currentContext shouldBe empty
- TraceRecorder.withTraceContext(createdContext) {
- TraceRecorder.currentContext should be(createdContext)
+ TraceContext.currentContext shouldBe empty
+ TraceContext.withContext(createdContext) {
+ TraceContext.currentContext should be(createdContext)
}
- TraceRecorder.currentContext shouldBe empty
+ TraceContext.currentContext shouldBe empty
}
"allow renaming a trace" in {
- val createdContext = TraceRecorder.withNewTraceContext("trace-before-rename") {
- TraceRecorder.rename("renamed-trace")
- TraceRecorder.currentContext
+ val createdContext = TraceContext.withContext(newContext("trace-before-rename")) {
+ TraceContext.currentContext.rename("renamed-trace")
+ TraceContext.currentContext
}
- TraceRecorder.currentContext shouldBe empty
+ TraceContext.currentContext shouldBe empty
createdContext.name shouldBe ("renamed-trace")
}
"allow creating a segment within a trace" in {
- val createdContext = TraceRecorder.withNewTraceContext("trace-with-segments") {
- val segment = TraceRecorder.currentContext.startSegment("segment-1", "segment-1-category", "segment-library")
- TraceRecorder.currentContext
+ val createdContext = TraceContext.withContext(newContext("trace-with-segments")) {
+ val segment = TraceContext.currentContext.startSegment("segment-1", "segment-1-category", "segment-library")
+ TraceContext.currentContext
}
- TraceRecorder.currentContext shouldBe empty
+ TraceContext.currentContext shouldBe empty
createdContext.name shouldBe ("trace-with-segments")
}
"allow renaming a segment" in {
- TraceRecorder.withNewTraceContext("trace-with-renamed-segment") {
- val segment = TraceRecorder.currentContext.startSegment("original-segment-name", "segment-label", "segment-library")
+ TraceContext.withContext(newContext("trace-with-renamed-segment")) {
+ val segment = TraceContext.currentContext.startSegment("original-segment-name", "segment-label", "segment-library")
segment.name should be("original-segment-name")
segment.rename("new-segment-name")
diff --git a/kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala
index f2b25820..8bacca83 100644
--- a/kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala
@@ -16,24 +16,21 @@
package kamon.trace
-import akka.actor.ActorSystem
-import akka.testkit.TestKit
+import kamon.testkit.BaseKamonSpec
import kamon.trace.TraceLocal.AvailableToMdc
import kamon.trace.logging.MdcKeysSupport
import org.scalatest.concurrent.PatienceConfiguration
-import org.scalatest.{ Matchers, OptionValues, WordSpecLike }
+import org.scalatest.OptionValues
import org.slf4j.MDC
-class TraceLocalSpec extends TestKit(ActorSystem("trace-local-spec")) with WordSpecLike with Matchers
- with PatienceConfiguration with OptionValues with MdcKeysSupport {
-
+class TraceLocalSpec extends BaseKamonSpec("trace-local-spec") with PatienceConfiguration with OptionValues with MdcKeysSupport {
val SampleTraceLocalKeyAvailableToMDC = AvailableToMdc("someKey")
object SampleTraceLocalKey extends TraceLocal.TraceLocalKey { type ValueType = String }
"the TraceLocal storage" should {
"allow storing and retrieving values" in {
- TraceRecorder.withNewTraceContext("store-and-retrieve-trace-local") {
+ TraceContext.withContext(newContext("store-and-retrieve-trace-local")) {
val testString = "Hello World"
TraceLocal.store(SampleTraceLocalKey)(testString)
@@ -42,7 +39,7 @@ class TraceLocalSpec extends TestKit(ActorSystem("trace-local-spec")) with WordS
}
"return None when retrieving a non existent key" in {
- TraceRecorder.withNewTraceContext("non-existent-key") {
+ TraceContext.withContext(newContext("non-existent-key")) {
TraceLocal.retrieve(SampleTraceLocalKey) should equal(None)
}
}
@@ -53,22 +50,22 @@ class TraceLocalSpec extends TestKit(ActorSystem("trace-local-spec")) with WordS
"be attached to the TraceContext when it is propagated" in {
val testString = "Hello World"
- val testContext = TraceRecorder.withNewTraceContext("manually-propagated-trace-local") {
+ val testContext = TraceContext.withContext(newContext("manually-propagated-trace-local")) {
TraceLocal.store(SampleTraceLocalKey)(testString)
TraceLocal.retrieve(SampleTraceLocalKey).value should equal(testString)
- TraceRecorder.currentContext
+ TraceContext.currentContext
}
/** No TraceLocal should be available here */
TraceLocal.retrieve(SampleTraceLocalKey) should equal(None)
- TraceRecorder.withTraceContext(testContext) {
+ TraceContext.withContext(testContext) {
TraceLocal.retrieve(SampleTraceLocalKey).value should equal(testString)
}
}
"allow retrieve a value from the MDC when was created a key with AvailableToMdc(cool-key)" in {
- TraceRecorder.withNewTraceContext("store-and-retrieve-trace-local-and-copy-to-mdc") {
+ TraceContext.withContext(newContext("store-and-retrieve-trace-local-and-copy-to-mdc")) {
val testString = "Hello MDC"
TraceLocal.store(SampleTraceLocalKeyAvailableToMDC)(testString)
@@ -81,7 +78,7 @@ class TraceLocalSpec extends TestKit(ActorSystem("trace-local-spec")) with WordS
}
"allow retrieve a value from the MDC when was created a key with AvailableToMdc.storeForMdc(String, String)" in {
- TraceRecorder.withNewTraceContext("store-and-retrieve-trace-local-and-copy-to-mdc") {
+ TraceContext.withContext(newContext("store-and-retrieve-trace-local-and-copy-to-mdc")) {
val testString = "Hello MDC"
TraceLocal.storeForMdc("someKey", testString)
diff --git a/kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala b/kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala
index 83992e61..ab98d0ac 100644
--- a/kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala
+++ b/kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala
@@ -40,6 +40,13 @@ class GlobPathFilterSpec extends WordSpecLike with Matchers {
filter.accept("/user/something/otherActor") shouldBe false
}
+ "match all expressions in the same levelss" in {
+ val filter = new GlobPathFilter("**")
+
+ filter.accept("GET: /ping") shouldBe true
+ filter.accept("GET: /ping/pong") shouldBe true
+ }
+
"match all expressions and crosses the path boundaries" in {
val filter = new GlobPathFilter("/user/actor-**")
@@ -51,7 +58,7 @@ class GlobPathFilterSpec extends WordSpecLike with Matchers {
filter.accept("/user/something/otherActor") shouldBe false
}
- "match exactly one characterr" in {
+ "match exactly one character" in {
val filter = new GlobPathFilter("/user/actor-?")
filter.accept("/user/actor-1") shouldBe true
diff --git a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala b/kamon-testkit/src/main/scala/testkit/AkkaExtensionSwap.scala
index b7050c59..2f77df95 100644
--- a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala
+++ b/kamon-testkit/src/main/scala/testkit/AkkaExtensionSwap.scala
@@ -14,11 +14,12 @@
* =========================================================================================
*/
-package kamon
+package testkit
-import akka.actor.{ Extension, ActorSystem, ExtensionId }
import java.util.concurrent.ConcurrentHashMap
+import akka.actor.{ ActorSystem, Extension, ExtensionId }
+
object AkkaExtensionSwap {
def swap(system: ActorSystem, key: ExtensionId[_], value: Extension): Unit = {
val extensionsField = system.getClass.getDeclaredField("extensions")
diff --git a/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala b/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala
index 825cc718..9e736971 100644
--- a/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala
+++ b/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala
@@ -17,7 +17,7 @@
package akka.testkit
import org.aspectj.lang.annotation._
-import kamon.trace.{ EmptyTraceContext, TraceContextAware, TraceRecorder }
+import kamon.trace.{ EmptyTraceContext, TraceContextAware, TraceContext }
import org.aspectj.lang.ProceedingJoinPoint
import akka.testkit.TestActor.RealMessage
@@ -46,7 +46,7 @@ class TestProbeInstrumentation {
case _ ⇒ EmptyTraceContext
}
- TraceRecorder.withTraceContext(traceContext) {
+ TraceContext.withContext(traceContext) {
pjp.proceed
}
}