aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-07-03 14:36:42 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-07-03 14:36:18 -0300
commit29068fc70a3e5a17a630c2c7fff951572bb5fa21 (patch)
tree7ec2632f36e9493cb559f510fa3cc3ead7443511
parent4d5803e579e223c4f4f5cb37ab79ca069a007949 (diff)
downloadKamon-29068fc70a3e5a17a630c2c7fff951572bb5fa21.tar.gz
Kamon-29068fc70a3e5a17a630c2c7fff951572bb5fa21.tar.bz2
Kamon-29068fc70a3e5a17a630c2c7fff951572bb5fa21.zip
! all: refactor the core metric recording instruments and accomodate UserMetrics
This PR is including several changes to the kamon-core, most notably: - Formalize the interface for Histograms, Counters and MinMaxCounters. Making sure that the interfaces are as clean as possible. - Move away from the all Vector[Measurement] based Histogram snapshot to a new approach in which we use a single long to store both the index in the counts array and the frequency on that bucket. The leftmost 2 bytes of each long are used for storing the counts array index and the remaining 6 bytes are used for the actual count, and everything is put into a simple long array. This way only the buckets that actually have values will be included in the snapshot with the smallest possible memory footprint. - Introduce Gauges. - Reorganize the instrumentation for Akka and Scala and rewrite most of the tests of this components to avoid going through the subscription protocol to test. - Introduce trace tests and fixes on various tests. - Necessary changes on new relic, datadog and statsd modules to compile with the new codebase. Pending: - Finish the upgrade of the new relic to the current model. - Introduce proper limit checks for histograms to ensure that we never pass the 2/6 bytes limits. - More testing, more testing, more testing. - Create the KamonStandalone module.
-rw-r--r--kamon-core/src/main/java/kamon/util/Example.java8
-rw-r--r--kamon-core/src/main/resources/META-INF/aop.xml22
-rw-r--r--kamon-core/src/main/resources/reference.conf43
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala (renamed from kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala)62
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala (renamed from kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala)12
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala (renamed from kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala)59
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala (renamed from kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala)17
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala (renamed from kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala)34
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala (renamed from kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala)29
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala (renamed from kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala)33
-rw-r--r--kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala89
-rw-r--r--kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala88
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala75
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala (renamed from kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala)18
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Scale.scala (renamed from kamon-core/src/main/scala/kamon/metrics/Scale.scala)2
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Subscriptions.scala (renamed from kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala)13
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala77
-rw-r--r--kamon-core/src/main/scala/kamon/metric/UserMetrics.scala139
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala59
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala78
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala246
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala116
-rw-r--r--kamon-core/src/main/scala/kamon/metric/package.scala (renamed from kamon-core/src/main/scala/kamon/metrics/package.scala)7
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala70
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala52
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala71
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/Metrics.scala121
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala66
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala52
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala78
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala58
-rw-r--r--kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala11
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala8
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala4
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala (renamed from kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala)18
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala (renamed from kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala)11
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala (renamed from kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala)17
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala (renamed from kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala)15
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala (renamed from kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala)15
-rw-r--r--kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala202
-rw-r--r--kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala (renamed from kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala)14
-rw-r--r--kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala (renamed from kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala)66
-rw-r--r--kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala92
-rw-r--r--kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala278
-rw-r--r--kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala55
-rw-r--r--kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala70
-rw-r--r--kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala130
-rw-r--r--kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala108
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala172
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala78
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala72
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/instrument/MinMaxCounterSpec.scala110
-rw-r--r--kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala95
-rw-r--r--kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala2
-rw-r--r--kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala47
-rw-r--r--kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala54
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala6
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala2
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala8
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala9
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/package.scala32
-rw-r--r--kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala5
-rw-r--r--kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala6
-rw-r--r--kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala7
-rw-r--r--kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala12
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala15
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala9
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/TestServer.scala11
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala2
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala35
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala40
-rw-r--r--project/Settings.scala2
72 files changed, 2448 insertions, 1391 deletions
diff --git a/kamon-core/src/main/java/kamon/util/Example.java b/kamon-core/src/main/java/kamon/util/Example.java
new file mode 100644
index 00000000..a5031182
--- /dev/null
+++ b/kamon-core/src/main/java/kamon/util/Example.java
@@ -0,0 +1,8 @@
+package kamon.util;
+
+public class Example {
+
+ public static void main(String args[]) {
+
+ }
+}
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml
index 3f7dd42d..e1edaed9 100644
--- a/kamon-core/src/main/resources/META-INF/aop.xml
+++ b/kamon-core/src/main/resources/META-INF/aop.xml
@@ -3,23 +3,23 @@
<aspectj>
<aspects>
<!-- Actors -->
- <aspect name="akka.instrumentation.RepointableActorRefTraceContextMixin"/>
- <aspect name="akka.instrumentation.SystemMessageTraceContextMixin"/>
- <aspect name="akka.instrumentation.ActorSystemMessagePassingTracing"/>
- <aspect name="akka.instrumentation.EnvelopeTraceContextMixin"/>
- <aspect name="akka.instrumentation.ActorCellMetricsMixin"/>
- <aspect name="akka.instrumentation.BehaviourInvokeTracing"/>
- <aspect name="kamon.instrumentation.ActorLoggingTracing"/>
+ <aspect name="akka.instrumentation.TraceContextIntoRepointableActorRefMixin"/>
+ <aspect name="akka.instrumentation.TraceContextIntoSystemMessageMixin"/>
+ <aspect name="akka.instrumentation.ActorSystemMessageInstrumentation"/>
+ <aspect name="akka.instrumentation.TraceContextIntoEnvelopeMixin"/>
+ <aspect name="akka.instrumentation.ActorCellMetricsIntoActorCellMixin"/>
+ <aspect name="akka.instrumentation.ActorCellInstrumentation"/>
+ <aspect name="akka.instrumentation.ActorLoggingInstrumentation"/>
<!-- Dispatchers -->
- <aspect name="akka.instrumentation.DispatcherTracing"/>
- <aspect name="akka.instrumentation.DispatcherMetricsMixin"/>
+ <aspect name="akka.instrumentation.DispatcherInstrumentation"/>
+ <aspect name="akka.instrumentation.DispatcherMetricCollectionInfoIntoDispatcherMixin"/>
<!-- Futures -->
- <aspect name="kamon.instrumentation.FutureTracing"/>
+ <aspect name="kamon.instrumentation.scala.FutureInstrumentation"/>
<!-- Patterns -->
- <aspect name="akka.instrumentation.AskPatternTracing"/>
+ <aspect name="akka.instrumentation.AskPatternInstrumentation"/>
</aspects>
<weaver options="-XmessageHandlerClass:kamon.weaver.logging.KamonWeaverMessageHandler">
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index d2830892..b7f5c70e 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -48,30 +48,33 @@ kamon {
]
precision {
+ default-histogram-precision {
+ highest-trackable-value = 3600000000000
+ significant-value-digits = 2
+ }
+
+ default-min-max-counter-precision {
+ refresh-interval = 100 milliseconds
+ highest-trackable-value = 999999999
+ significant-value-digits = 2
+ }
+
+ default-gauge-precision {
+ refresh-interval = 100 milliseconds
+ highest-trackable-value = 999999999
+ significant-value-digits = 2
+ }
+
+
actor {
- processing-time {
- highest-trackable-value = 3600000000000
- significant-value-digits = 2
- }
- time-in-mailbox {
- highest-trackable-value = 3600000000000
- significant-value-digits = 2
- }
- mailbox-size {
- highest-trackable-value = 999999999
- significant-value-digits = 2
- }
+ processing-time = ${kamon.metrics.precision.default-histogram-precision}
+ time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision}
+ mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision}
}
trace {
- elapsed-time {
- highest-trackable-value = 3600000000000
- significant-value-digits = 2
- }
- segment {
- highest-trackable-value = 3600000000000
- significant-value-digits = 2
- }
+ elapsed-time = ${kamon.metrics.precision.default-histogram-precision}
+ segment = ${kamon.metrics.precision.default-histogram-precision}
}
dispatcher {
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
index 6db86828..5fce4555 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
@@ -1,34 +1,32 @@
-/* ===================================================
+/*
+ * =========================================================================================
* Copyright © 2013-2014 the kamon project <http://kamon.io/>
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
package akka.instrumentation
-import org.aspectj.lang.annotation._
-import org.aspectj.lang.ProceedingJoinPoint
import akka.actor._
import akka.dispatch.{ Envelope, MessageDispatcher }
-import kamon.trace._
-import kamon.metrics.{ ActorMetrics, Metrics }
import kamon.Kamon
-import kamon.metrics.ActorMetrics.ActorMetricRecorder
-import kamon.metrics.instruments.MinMaxCounter
-import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement
+import kamon.metric.ActorMetrics.ActorMetricsRecorder
+import kamon.metric.{ ActorMetrics, Metrics }
+import kamon.trace._
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation._
@Aspect
-class BehaviourInvokeTracing {
+class ActorCellInstrumentation {
@Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)")
def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
@@ -42,19 +40,6 @@ class BehaviourInvokeTracing {
cellWithMetrics.metricIdentity = metricIdentity
cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory)
-
- if (cellWithMetrics.actorMetricsRecorder.isDefined) {
- cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder {
- cellWithMetrics.actorMetricsRecorder.map { am ⇒
- import am.mailboxSize._
- val CounterMeasurement(min, max, current) = cellWithMetrics.queueSize.collect()
-
- record(min)
- record(max)
- record(current)
- }
- }
- }
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
@@ -75,7 +60,7 @@ class BehaviourInvokeTracing {
am ⇒
am.processingTime.record(System.nanoTime() - timestampBeforeProcessing)
am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime)
- cellWithMetrics.queueSize.decrement()
+ am.mailboxSize.decrement()
}
}
}
@@ -86,7 +71,7 @@ class BehaviourInvokeTracing {
@After("sendingMessageToActorCell(cell)")
def afterSendMessageToActorCell(cell: ActorCell): Unit = {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.actorMetricsRecorder.map(am ⇒ cellWithMetrics.queueSize.increment())
+ cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment())
}
@Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
@@ -110,27 +95,26 @@ class BehaviourInvokeTracing {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
cellWithMetrics.actorMetricsRecorder.map {
- am ⇒ am.errorCounter.record(1L)
+ am ⇒ am.errors.increment()
}
}
}
trait ActorCellMetrics {
var metricIdentity: ActorMetrics = _
- var actorMetricsRecorder: Option[ActorMetricRecorder] = _
+ var actorMetricsRecorder: Option[ActorMetricsRecorder] = _
var mailboxSizeCollectorCancellable: Cancellable = _
- val queueSize = MinMaxCounter()
}
@Aspect
-class ActorCellMetricsMixin {
+class ActorCellMetricsIntoActorCellMixin {
@DeclareMixin("akka.actor.ActorCell")
def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {}
}
@Aspect
-class EnvelopeTraceContextMixin {
+class TraceContextIntoEnvelopeMixin {
@DeclareMixin("akka.dispatch.Envelope")
def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala
index 85d39d3e..ee9d442f 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala
@@ -1,11 +1,11 @@
/*
* =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
@@ -14,14 +14,14 @@
* =========================================================================================
*/
-package kamon.instrumentation
+package akka.instrumentation
-import org.aspectj.lang.annotation._
-import org.aspectj.lang.ProceedingJoinPoint
import kamon.trace.{ TraceContextAware, TraceRecorder }
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation._
@Aspect
-class ActorLoggingTracing {
+class ActorLoggingInstrumentation {
@DeclareMixin("akka.event.Logging.LogEvent+")
def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala
index 7d03d946..9b6b6866 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala
@@ -1,12 +1,44 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package akka.instrumentation
-import org.aspectj.lang.annotation._
import akka.dispatch.sysmsg.EarliestFirstSystemMessageList
+import kamon.trace.{ TraceContextAware, TraceRecorder }
import org.aspectj.lang.ProceedingJoinPoint
-import kamon.trace.{ TraceRecorder, TraceContextAware }
+import org.aspectj.lang.annotation._
+
+@Aspect
+class ActorSystemMessageInstrumentation {
+
+ @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)")
+ def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {}
+
+ @Around("systemMessageProcessing(messages)")
+ def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = {
+ if (messages.nonEmpty) {
+ val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext
+ TraceRecorder.withTraceContext(ctx)(pjp.proceed())
+
+ } else pjp.proceed()
+ }
+}
@Aspect
-class SystemMessageTraceContextMixin {
+class TraceContextIntoSystemMessageMixin {
@DeclareMixin("akka.dispatch.sysmsg.SystemMessage+")
def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default
@@ -22,7 +54,7 @@ class SystemMessageTraceContextMixin {
}
@Aspect
-class RepointableActorRefTraceContextMixin {
+class TraceContextIntoRepointableActorRefMixin {
@DeclareMixin("akka.actor.RepointableActorRef")
def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default
@@ -45,21 +77,4 @@ class RepointableActorRefTraceContextMixin {
pjp.proceed()
}
}
-
-}
-
-@Aspect
-class ActorSystemMessagePassingTracing {
-
- @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)")
- def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {}
-
- @Around("systemMessageProcessing(messages)")
- def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = {
- if (messages.nonEmpty) {
- val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext
- TraceRecorder.withTraceContext(ctx)(pjp.proceed())
-
- } else pjp.proceed()
- }
-}
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala
index 31ec92a4..3bf13ce2 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala
@@ -1,11 +1,11 @@
/*
* =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
@@ -16,16 +16,17 @@
package akka.instrumentation
-import org.aspectj.lang.annotation.{ AfterReturning, Pointcut, Aspect }
-import akka.event.Logging.Warning
-import scala.compat.Platform.EOL
import akka.actor.ActorRefProvider
+import akka.event.Logging.Warning
import akka.pattern.{ AskTimeoutException, PromiseActorRef }
-import kamon.trace.Trace
import kamon.Kamon
+import kamon.trace.Trace
+import org.aspectj.lang.annotation.{ AfterReturning, Aspect, Pointcut }
+
+import scala.compat.Platform.EOL
@Aspect
-class AskPatternTracing {
+class AskPatternInstrumentation {
class StackTraceCaptureException extends Throwable
@@ -46,7 +47,7 @@ class AskPatternTracing {
case timeout: AskTimeoutException ⇒
val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL)
- system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternTracing],
+ system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation],
"Timeout triggered for ask pattern registered at: " + stackString))
}
}
diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala
index 60cc4ddf..db366e8c 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala
@@ -5,7 +5,7 @@
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
@@ -16,19 +16,21 @@
package akka.instrumentation
-import org.aspectj.lang.annotation._
-import akka.dispatch.{ Dispatchers, ExecutorServiceDelegate, Dispatcher, MessageDispatcher }
-import kamon.metrics.{ Metrics, DispatcherMetrics }
-import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder
-import kamon.Kamon
-import akka.actor.{ Cancellable, ActorSystemImpl }
-import scala.concurrent.forkjoin.ForkJoinPool
-import java.util.concurrent.ThreadPoolExecutor
import java.lang.reflect.Method
+import java.util.concurrent.ThreadPoolExecutor
+
+import akka.actor.{ ActorSystemImpl, Cancellable }
+import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher }
import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement
+import kamon.Kamon
+import kamon.metric.DispatcherMetrics.DispatcherMetricRecorder
+import kamon.metric.{ DispatcherMetrics, Metrics }
+import org.aspectj.lang.annotation._
+
+import scala.concurrent.forkjoin.ForkJoinPool
@Aspect
-class DispatcherTracing {
+class DispatcherInstrumentation {
@Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))")
def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {}
@@ -45,7 +47,7 @@ class DispatcherTracing {
@AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher")
def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = {
val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem]
- val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
+ val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo]
dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem
}
@@ -62,7 +64,7 @@ class DispatcherTracing {
@After("onDispatcherStartup(dispatcher)")
def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = {
- val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
+ val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo]
val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem)
val metricIdentity = DispatcherMetrics(dispatcher.id)
@@ -90,7 +92,7 @@ class DispatcherTracing {
@After("onDispatcherShutdown(dispatcher)")
def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {
- val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
+ val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo]
dispatcherWithMetrics.dispatcherMetricsRecorder.map {
dispatcher ⇒
@@ -101,16 +103,16 @@ class DispatcherTracing {
}
@Aspect
-class DispatcherMetricsMixin {
+class DispatcherMetricCollectionInfoIntoDispatcherMixin {
@DeclareMixin("akka.dispatch.Dispatcher")
- def mixinDispatcherMetricsToMessageDispatcher: DispatcherMessageMetrics = new DispatcherMessageMetrics {}
+ def mixinDispatcherMetricsToMessageDispatcher: DispatcherMetricCollectionInfo = new DispatcherMetricCollectionInfo {}
@DeclareMixin("akka.dispatch.Dispatchers")
def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {}
}
-trait DispatcherMessageMetrics {
+trait DispatcherMetricCollectionInfo {
var metricIdentity: DispatcherMetrics = _
var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _
var dispatcherCollectorCancellable: Cancellable = _
diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala b/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala
index e5efbc15..e79090a8 100644
--- a/kamon-core/src/main/scala/kamon/metrics/instruments/CounterRecorder.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/hdrhistogram/AtomicHistogramFieldsAccessor.scala
@@ -1,4 +1,3 @@
-package kamon.metrics.instruments
/*
* =========================================================================================
* Copyright © 2013-2014 the kamon project <http://kamon.io/>
@@ -15,24 +14,22 @@ package kamon.metrics.instruments
* =========================================================================================
*/
-import kamon.metrics._
-import kamon.metrics.MetricSnapshot.Measurement
+package org.HdrHistogram
-import jsr166e.LongAdder
+import java.util.concurrent.atomic.{ AtomicLongArray, AtomicLongFieldUpdater }
-class CounterRecorder extends MetricRecorder {
- private val counter = new LongAdder
+trait AtomicHistogramFieldsAccessor {
+ self: AtomicHistogram ⇒
- def record(value: Long): Unit = {
- counter.add(value)
- }
+ def countsArray(): AtomicLongArray = self.counts
- def collect(): MetricSnapshotLike = {
- val sum = counter.sumThenReset()
- MetricSnapshot(InstrumentTypes.Counter, sum, Scale.Unit, Vector(Measurement(1, sum)))
- }
+ def unitMagnitude(): Int = self.unitMagnitude
+
+ def subBucketHalfCount(): Int = self.subBucketHalfCount
+
+ def subBucketHalfCountMagnitude(): Int = self.subBucketHalfCountMagnitude
}
-object CounterRecorder {
- def apply(): CounterRecorder = new CounterRecorder()
-} \ No newline at end of file
+object AtomicHistogramFieldsAccessor {
+ def totalCountUpdater(): AtomicLongFieldUpdater[AtomicHistogram] = AtomicHistogram.totalCountUpdater
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala
index 634c94a1..d8f2b620 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/scala/FutureInstrumentation.scala
@@ -1,26 +1,27 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.instrumentation
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.instrumentation.scala
-import org.aspectj.lang.annotation._
-import org.aspectj.lang.ProceedingJoinPoint
import kamon.trace.{ TraceContextAware, TraceRecorder }
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation._
@Aspect
-class FutureTracing {
+class FutureInstrumentation {
@DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default
diff --git a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala
new file mode 100644
index 00000000..bb412f79
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala
@@ -0,0 +1,89 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+import kamon.metric.instrument.{ MinMaxCounter, Counter, Histogram }
+
+case class ActorMetrics(name: String) extends MetricGroupIdentity {
+ val category = ActorMetrics
+}
+
+object ActorMetrics extends MetricGroupCategory {
+ val name = "actor"
+
+ case object ProcessingTime extends MetricIdentity { val name = "processing-time" }
+ case object MailboxSize extends MetricIdentity { val name = "mailbox-size" }
+ case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" }
+ case object Errors extends MetricIdentity { val name = "errors" }
+
+ case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter,
+ errors: Counter) extends MetricGroupRecorder {
+
+ def collect(context: CollectionContext): ActorMetricSnapshot =
+ ActorMetricSnapshot(
+ processingTime.collect(context),
+ timeInMailbox.collect(context),
+ mailboxSize.collect(context),
+ errors.collect(context))
+
+ def cleanup: Unit = {
+ processingTime.cleanup
+ mailboxSize.cleanup
+ timeInMailbox.cleanup
+ errors.cleanup
+ }
+ }
+
+ case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot,
+ mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot {
+
+ type GroupSnapshotType = ActorMetricSnapshot
+
+ def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot =
+ ActorMetricSnapshot(
+ processingTime.merge(that.processingTime, context),
+ timeInMailbox.merge(that.timeInMailbox, context),
+ mailboxSize.merge(that.mailboxSize, context),
+ errors.merge(that.errors, context))
+
+ lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
+ (ProcessingTime -> processingTime),
+ (MailboxSize -> mailboxSize),
+ (TimeInMailbox -> timeInMailbox),
+ (Errors -> errors))
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = ActorMetricsRecorder
+
+ def create(config: Config, system: ActorSystem): ActorMetricsRecorder = {
+ val settings = config.getConfig("precision.actor")
+
+ val processingTimeConfig = settings.getConfig("processing-time")
+ val timeInMailboxConfig = settings.getConfig("time-in-mailbox")
+ val mailboxSizeConfig = settings.getConfig("mailbox-size")
+
+ new ActorMetricsRecorder(
+ Histogram.fromConfig(processingTimeConfig),
+ Histogram.fromConfig(timeInMailboxConfig),
+ MinMaxCounter.fromConfig(mailboxSizeConfig, system),
+ Counter())
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala
new file mode 100644
index 00000000..fbce783c
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala
@@ -0,0 +1,88 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+import kamon.metric.instrument.{ Histogram, HdrHistogram }
+
+case class DispatcherMetrics(name: String) extends MetricGroupIdentity {
+ val category = DispatcherMetrics
+}
+
+object DispatcherMetrics extends MetricGroupCategory {
+ val name = "dispatcher"
+
+ case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" }
+ case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" }
+ case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" }
+ case object PoolSize extends MetricIdentity { val name = "pool-size" }
+
+ case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram,
+ queueTaskCount: Histogram, poolSize: Histogram)
+ extends MetricGroupRecorder {
+
+ def collect(context: CollectionContext): MetricGroupSnapshot =
+ DispatcherMetricSnapshot(
+ maximumPoolSize.collect(context),
+ runningThreadCount.collect(context),
+ queueTaskCount.collect(context),
+ poolSize.collect(context))
+
+ def cleanup: Unit = {}
+
+ }
+
+ case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot,
+ queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot {
+
+ type GroupSnapshotType = DispatcherMetricSnapshot
+
+ def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot =
+ DispatcherMetricSnapshot(
+ maximumPoolSize.merge(that.maximumPoolSize, context),
+ runningThreadCount.merge(that.runningThreadCount, context),
+ queueTaskCount.merge(that.queueTaskCount, context),
+ poolSize.merge(that.poolSize, context))
+
+ lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
+ (MaximumPoolSize -> maximumPoolSize),
+ (RunningThreadCount -> runningThreadCount),
+ (QueueTaskCount -> queueTaskCount),
+ (PoolSize -> poolSize))
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = DispatcherMetricRecorder
+
+ def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = {
+ val settings = config.getConfig("precision.dispatcher")
+
+ val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size")
+ val runningThreadCountConfig = settings.getConfig("running-thread-count")
+ val queueTaskCountConfig = settings.getConfig("queued-task-count")
+ val poolSizeConfig = settings.getConfig("pool-size")
+
+ new DispatcherMetricRecorder(
+ Histogram.fromConfig(maximumPoolSizeConfig),
+ Histogram.fromConfig(runningThreadCountConfig),
+ Histogram.fromConfig(queueTaskCountConfig),
+ Histogram.fromConfig(poolSizeConfig))
+ }
+ }
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala
new file mode 100644
index 00000000..325dd216
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala
@@ -0,0 +1,75 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import java.nio.{ LongBuffer }
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+
+trait MetricGroupCategory {
+ def name: String
+}
+
+trait MetricGroupIdentity {
+ def name: String
+ def category: MetricGroupCategory
+}
+
+trait MetricIdentity {
+ def name: String
+}
+
+trait CollectionContext {
+ def buffer: LongBuffer
+}
+
+object CollectionContext {
+ def default: CollectionContext = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(10000)
+ }
+}
+
+trait MetricGroupRecorder {
+ def collect(context: CollectionContext): MetricGroupSnapshot
+ def cleanup: Unit
+}
+
+trait MetricSnapshot {
+ type SnapshotType
+
+ def merge(that: SnapshotType, context: CollectionContext): SnapshotType
+}
+
+trait MetricGroupSnapshot {
+ type GroupSnapshotType
+
+ def metrics: Map[MetricIdentity, MetricSnapshot]
+ def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType
+}
+
+private[kamon] trait MetricRecorder {
+ type SnapshotType <: MetricSnapshot
+
+ def collect(context: CollectionContext): SnapshotType
+ def cleanup: Unit
+}
+
+trait MetricGroupFactory {
+ type GroupRecorder <: MetricGroupRecorder
+ def create(config: Config, system: ActorSystem): GroupRecorder
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
index c60babb2..1025f0de 100644
--- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
@@ -14,7 +14,9 @@
* =========================================================================================
*/
-package kamon.metrics
+package kamon.metric
+
+import java.nio.{ LongBuffer, ByteBuffer }
import scala.collection.concurrent.TrieMap
import akka.actor._
@@ -22,8 +24,8 @@ import com.typesafe.config.Config
import kamon.util.GlobPathFilter
import kamon.Kamon
import akka.actor
-import kamon.metrics.Metrics.MetricGroupFilter
-import kamon.metrics.Subscriptions.Subscribe
+import kamon.metric.Metrics.MetricGroupFilter
+import kamon.metric.Subscriptions.Subscribe
import java.util.concurrent.TimeUnit
class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
@@ -42,7 +44,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = {
if (shouldTrack(identity))
- Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig)).asInstanceOf[factory.GroupRecorder])
+ Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder])
else
None
}
@@ -56,7 +58,11 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
}
def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = {
- (for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap
+ // TODO: Improve the way in which we are getting the context.
+ val context = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(50000)
+ }
+ (for ((identity, recorder) ← storage) yield (identity, recorder.collect(context))).toMap
}
def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = {
@@ -68,7 +74,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
}
private def shouldTrack(identity: MetricGroupIdentity): Boolean = {
- filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(false)
+ filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true)
}
def loadFilters(config: Config): Map[String, MetricGroupFilter] = {
diff --git a/kamon-core/src/main/scala/kamon/metrics/Scale.scala b/kamon-core/src/main/scala/kamon/metric/Scale.scala
index 6899490a..2f27c1a3 100644
--- a/kamon-core/src/main/scala/kamon/metrics/Scale.scala
+++ b/kamon-core/src/main/scala/kamon/metric/Scale.scala
@@ -14,7 +14,7 @@
* =========================================================================================
*/
-package kamon.metrics
+package kamon.metric
class Scale(val numericValue: Double) extends AnyVal
diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
index c9990229..a9f4c721 100644
--- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
+++ b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
@@ -14,15 +14,15 @@
* =========================================================================================
*/
-package kamon.metrics
+package kamon.metric
import akka.actor.{ Props, ActorRef, Actor }
-import kamon.metrics.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe }
+import kamon.metric.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe }
import kamon.util.GlobPathFilter
import scala.concurrent.duration.{ FiniteDuration, Duration }
import java.util.concurrent.TimeUnit
import kamon.Kamon
-import kamon.metrics.TickMetricSnapshotBuffer.{ Combined, FlushBuffer }
+import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer
class Subscriptions extends Actor {
import context.system
@@ -65,7 +65,7 @@ class Subscriptions extends Actor {
}
def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]],
- snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = {
+ snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = {
for ((filter, receivers) ← subscriptions) yield {
val selection = snapshots.filter(group ⇒ filter.accept(group._1))
@@ -90,6 +90,7 @@ object Subscriptions {
class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor {
val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher)
+ val collectionContext = CollectionContext.default
def receive = empty
@@ -116,14 +117,12 @@ class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef
super.postStop()
}
- def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r)))
+ def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = left.merge(right.asInstanceOf[left.GroupSnapshotType], collectionContext).asInstanceOf[MetricGroupSnapshot] // ??? //Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r, collectionContext)))
}
object TickMetricSnapshotBuffer {
case object FlushBuffer
- case class Combined(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot
-
def props(flushInterval: FiniteDuration, receiver: ActorRef): Props =
Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver))
}
diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
new file mode 100644
index 00000000..1ee1eab4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
@@ -0,0 +1,77 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor.ActorSystem
+import kamon.metric.instrument.{ Histogram }
+
+import scala.collection.concurrent.TrieMap
+import com.typesafe.config.Config
+
+case class TraceMetrics(name: String) extends MetricGroupIdentity {
+ val category = TraceMetrics
+}
+
+object TraceMetrics extends MetricGroupCategory {
+ val name = "trace"
+
+ case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" }
+ case class HttpClientRequest(name: String) extends MetricIdentity
+
+ case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram)
+ extends MetricGroupRecorder {
+
+ private val segments = TrieMap[MetricIdentity, Histogram]()
+
+ def segmentRecorder(segmentIdentity: MetricIdentity): Histogram =
+ segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply())
+
+ def collect(context: CollectionContext): TraceMetricsSnapshot =
+ TraceMetricsSnapshot(
+ elapsedTime.collect(context),
+ segments.map { case (identity, recorder) ⇒ (identity, recorder.collect(context)) }.toMap)
+
+ def cleanup: Unit = {}
+ }
+
+ case class TraceMetricsSnapshot(elapsedTime: Histogram.Snapshot, segments: Map[MetricIdentity, Histogram.Snapshot])
+ extends MetricGroupSnapshot {
+
+ type GroupSnapshotType = TraceMetricsSnapshot
+
+ def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot =
+ TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty)
+
+ def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime)
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = TraceMetricRecorder
+
+ def create(config: Config, system: ActorSystem): TraceMetricRecorder = {
+
+ val settings = config.getConfig("precision.trace")
+ val elapsedTimeConfig = settings.getConfig("elapsed-time")
+ val segmentConfig = settings.getConfig("segment")
+
+ new TraceMetricRecorder(
+ Histogram.fromConfig(elapsedTimeConfig),
+ () ⇒ Histogram.fromConfig(segmentConfig))
+ }
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
new file mode 100644
index 00000000..dea03968
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
@@ -0,0 +1,139 @@
+package kamon.metric
+
+import akka.actor
+import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
+import com.typesafe.config.Config
+import kamon.Kamon
+import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram }
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration.FiniteDuration
+
+class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ lazy val userMetricsRecorder = Kamon(Metrics)(system).register(UserMetrics, UserMetrics.Factory).get
+
+ def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram =
+ userMetricsRecorder.buildHistogram(name, precision, highestTrackableValue)
+
+ def registerHistogram(name: String): Histogram =
+ userMetricsRecorder.buildHistogram(name)
+
+ def registerCounter(name: String): Counter =
+ userMetricsRecorder.buildCounter(name)
+
+ def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
+ refreshInterval: FiniteDuration): MinMaxCounter = {
+ userMetricsRecorder.buildMinMaxCounter(name, precision, highestTrackableValue, refreshInterval)
+ }
+
+ def registerMinMaxCounter(name: String): MinMaxCounter =
+ userMetricsRecorder.buildMinMaxCounter(name)
+
+ def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge =
+ userMetricsRecorder.buildGauge(name)(currentValueCollector)
+
+ def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
+ refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge =
+ userMetricsRecorder.buildGauge(name, precision, highestTrackableValue, refreshInterval, currentValueCollector)
+}
+
+object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider with MetricGroupIdentity {
+ def lookup(): ExtensionId[_ <: actor.Extension] = Metrics
+ def createExtension(system: ExtendedActorSystem): UserMetricsExtension = new UserMetricsExtension(system)
+
+ val name: String = "user-metrics-recorder"
+ val category = new MetricGroupCategory {
+ val name: String = "user-metrics"
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = UserMetricsRecorder
+ def create(config: Config, system: ActorSystem): UserMetricsRecorder = new UserMetricsRecorder(system)
+ }
+
+ class UserMetricsRecorder(system: ActorSystem) extends MetricGroupRecorder {
+ val precisionConfig = system.settings.config.getConfig("kamon.metrics.precision")
+ val defaultHistogramPrecisionConfig = precisionConfig.getConfig("default-histogram-precision")
+ val defaultMinMaxCounterPrecisionConfig = precisionConfig.getConfig("default-min-max-counter-precision")
+ val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision")
+
+ val histograms = TrieMap[String, Histogram]()
+ val counters = TrieMap[String, Counter]()
+ val minMaxCounters = TrieMap[String, MinMaxCounter]()
+ val gauges = TrieMap[String, Gauge]()
+
+ def buildHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram =
+ histograms.getOrElseUpdate(name, Histogram(highestTrackableValue, precision, Scale.Unit))
+
+ def buildHistogram(name: String): Histogram =
+ histograms.getOrElseUpdate(name, Histogram.fromConfig(defaultHistogramPrecisionConfig))
+
+ def buildCounter(name: String): Counter =
+ counters.getOrElseUpdate(name, Counter())
+
+ def buildMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
+ refreshInterval: FiniteDuration): MinMaxCounter = {
+ minMaxCounters.getOrElseUpdate(name, MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system))
+ }
+
+ def buildMinMaxCounter(name: String): MinMaxCounter =
+ minMaxCounters.getOrElseUpdate(name, MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system))
+
+ def buildGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
+ refreshInterval: FiniteDuration, currentValueCollector: Gauge.CurrentValueCollector): Gauge =
+ gauges.getOrElseUpdate(name, Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector))
+
+ def buildGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge =
+ gauges.getOrElseUpdate(name, Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector))
+
+ def collect(context: CollectionContext): UserMetricsSnapshot = {
+ val histogramSnapshots = histograms.map {
+ case (name, histogram) ⇒
+ (UserHistogram(name), histogram.collect(context))
+ } toMap
+
+ val counterSnapshots = counters.map {
+ case (name, counter) ⇒
+ (UserCounter(name), counter.collect(context))
+ } toMap
+
+ val minMaxCounterSnapshots = minMaxCounters.map {
+ case (name, minMaxCounter) ⇒
+ (UserMinMaxCounter(name), minMaxCounter.collect(context))
+ } toMap
+
+ val gaugeSnapshots = gauges.map {
+ case (name, gauge) ⇒
+ (UserGauge(name), gauge.collect(context))
+ } toMap
+
+ UserMetricsSnapshot(histogramSnapshots, counterSnapshots, minMaxCounterSnapshots, gaugeSnapshots)
+ }
+
+ def cleanup: Unit = {}
+ }
+
+ case class UserHistogram(name: String) extends MetricIdentity
+ case class UserCounter(name: String) extends MetricIdentity
+ case class UserMinMaxCounter(name: String) extends MetricIdentity
+ case class UserGauge(name: String) extends MetricIdentity
+
+ case class UserMetricsSnapshot(histograms: Map[UserHistogram, Histogram.Snapshot],
+ counters: Map[UserCounter, Counter.Snapshot],
+ minMaxCounters: Map[UserMinMaxCounter, Histogram.Snapshot],
+ gauges: Map[UserGauge, Histogram.Snapshot])
+ extends MetricGroupSnapshot {
+
+ type GroupSnapshotType = UserMetricsSnapshot
+
+ def merge(that: UserMetricsSnapshot, context: CollectionContext): UserMetricsSnapshot =
+ UserMetricsSnapshot(
+ combineMaps(histograms, that.histograms)((l, r) ⇒ l.merge(r, context)),
+ combineMaps(counters, that.counters)((l, r) ⇒ l.merge(r, context)),
+ combineMaps(minMaxCounters, that.minMaxCounters)((l, r) ⇒ l.merge(r, context)),
+ combineMaps(gauges, that.gauges)((l, r) ⇒ l.merge(r, context)))
+
+ def metrics: Map[MetricIdentity, MetricSnapshot] = histograms ++ counters ++ minMaxCounters ++ gauges
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
new file mode 100644
index 00000000..b592bcd3
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
@@ -0,0 +1,59 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import jsr166e.LongAdder
+import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder }
+
+trait Counter extends MetricRecorder {
+ type SnapshotType = Counter.Snapshot
+
+ def increment(): Unit
+ def increment(times: Long): Unit
+}
+
+object Counter {
+
+ def apply(): Counter = new LongAdderCounter
+
+ trait Snapshot extends MetricSnapshot {
+ type SnapshotType = Counter.Snapshot
+
+ def count: Long
+ def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot
+ }
+}
+
+class LongAdderCounter extends Counter {
+ private val counter = new LongAdder
+
+ def increment(): Unit = counter.increment()
+
+ def increment(times: Long): Unit = {
+ if (times < 0)
+ throw new UnsupportedOperationException("Counters cannot be decremented")
+ counter.add(times)
+ }
+
+ def collect(context: CollectionContext): Counter.Snapshot = CounterSnapshot(counter.sumThenReset())
+
+ def cleanup: Unit = {}
+}
+
+case class CounterSnapshot(count: Long) extends Counter.Snapshot {
+ def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
new file mode 100644
index 00000000..1efff2bc
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
@@ -0,0 +1,78 @@
+package kamon.metric.instrument
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
+import akka.actor.{ Cancellable, ActorSystem }
+import com.typesafe.config.Config
+import kamon.metric.{ CollectionContext, Scale, MetricRecorder }
+
+import scala.concurrent.duration.FiniteDuration
+
+trait Gauge extends MetricRecorder {
+ type SnapshotType = Histogram.Snapshot
+
+ def record(value: Long)
+ def record(value: Long, count: Long)
+}
+
+object Gauge {
+
+ trait CurrentValueCollector {
+ def currentValue: Long
+ }
+
+ def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration,
+ system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = {
+
+ val underlyingHistogram = Histogram(highestTrackableValue, precision, scale)
+ val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector)
+
+ val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) {
+ gauge.refreshValue()
+ }(system.dispatcher) // TODO: Move this to Kamon dispatchers
+
+ gauge.refreshValuesSchedule.set(refreshValuesSchedule)
+ gauge
+ }
+
+ def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge =
+ fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction))
+
+ def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = {
+ val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision")
+ fromConfig(config, system)(currentValueCollector)
+ }
+
+ def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = {
+ import scala.concurrent.duration._
+
+ val highest = config.getLong("highest-trackable-value")
+ val significantDigits = config.getInt("significant-value-digits")
+ val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS)
+
+ Gauge(Histogram.Precision(significantDigits), highest, Scale.Unit, refreshInterval.millis, system)(currentValueCollector)
+ }
+
+ implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector {
+ def currentValue: Long = f.apply()
+ }
+}
+
+class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge {
+ val refreshValuesSchedule = new AtomicReference[Cancellable]()
+
+ def record(value: Long): Unit = underlyingHistogram.record(value)
+
+ def record(value: Long, count: Long): Unit = underlyingHistogram.record(value, count)
+
+ def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context)
+
+ def cleanup: Unit = {
+ if (refreshValuesSchedule.get() != null)
+ refreshValuesSchedule.get().cancel()
+ }
+
+ def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue)
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
new file mode 100644
index 00000000..9ae077f4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
@@ -0,0 +1,246 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import java.nio.LongBuffer
+import com.typesafe.config.Config
+import org.HdrHistogram.AtomicHistogramFieldsAccessor
+import org.HdrHistogram.AtomicHistogram
+import kamon.metric._
+
+trait Histogram extends MetricRecorder {
+ type SnapshotType = Histogram.Snapshot
+
+ def record(value: Long)
+ def record(value: Long, count: Long)
+}
+
+object Histogram {
+
+ def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram =
+ new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale)
+
+ def fromConfig(config: Config): Histogram = {
+ val highest = config.getLong("highest-trackable-value")
+ val significantDigits = config.getInt("significant-value-digits")
+
+ new HdrHistogram(1L, highest, significantDigits)
+ }
+
+ object HighestTrackableValue {
+ val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L
+ }
+
+ case class Precision(significantDigits: Int)
+ object Precision {
+ val Low = Precision(1)
+ val Normal = Precision(2)
+ val Fine = Precision(3)
+ }
+
+ trait Record {
+ def level: Long
+ def count: Long
+
+ private[kamon] def rawCompactRecord: Long
+ }
+
+ case class MutableRecord(var level: Long, var count: Long) extends Record {
+ var rawCompactRecord: Long = 0L
+ }
+
+ trait Snapshot extends MetricSnapshot {
+ type SnapshotType = Histogram.Snapshot
+
+ def isEmpty: Boolean = numberOfMeasurements == 0
+ def scale: Scale
+ def numberOfMeasurements: Long
+ def min: Long
+ def max: Long
+ def recordsIterator: Iterator[Record]
+ def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot
+ }
+}
+
+/**
+ * This implementation is meant to be used for real time data collection where data snapshots are taken often over time.
+ * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still
+ * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken.
+ */
+class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit)
+ extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits)
+ with Histogram with AtomicHistogramFieldsAccessor {
+
+ import AtomicHistogramFieldsAccessor.totalCountUpdater
+
+ def record(value: Long): Unit = recordValue(value)
+
+ def record(value: Long, count: Long): Unit = recordValueWithCount(value, count)
+
+ def collect(context: CollectionContext): Histogram.Snapshot = {
+ import context.buffer
+ buffer.clear()
+ val nrOfMeasurements = writeSnapshotTo(buffer)
+
+ buffer.flip()
+
+ val measurementsArray = Array.ofDim[Long](buffer.limit())
+ buffer.get(measurementsArray, 0, measurementsArray.length)
+ new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude())
+ }
+
+ def cleanup: Unit = {}
+
+ private def writeSnapshotTo(buffer: LongBuffer): Long = {
+ val counts = countsArray()
+ val countsLength = counts.length()
+
+ var nrOfMeasurements = 0L
+ var index = 0L
+ while (index < countsLength) {
+ val countAtIndex = counts.getAndSet(index.toInt, 0L)
+
+ if (countAtIndex > 0) {
+ buffer.put(CompactHdrSnapshot.compactRecord(index, countAtIndex))
+ nrOfMeasurements += countAtIndex
+ }
+
+ index += 1
+ }
+
+ reestablishTotalCount(nrOfMeasurements)
+ nrOfMeasurements
+ }
+
+ private def reestablishTotalCount(diff: Long): Unit = {
+ def tryUpdateTotalCount: Boolean = {
+ val previousTotalCount = getTotalCount
+ val newTotalCount = previousTotalCount - diff
+
+ totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount)
+ }
+
+ while (!tryUpdateTotalCount) {}
+ }
+
+}
+
+class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int,
+ subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot {
+
+ def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0))
+ def max: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(compactRecords.length - 1))
+
+ def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = {
+ if (that.isEmpty) this else if (this.isEmpty) that else {
+ import context.buffer
+ buffer.clear()
+
+ val selfIterator = recordsIterator
+ val thatIterator = that.recordsIterator
+ var thatCurrentRecord: Histogram.Record = null
+ var mergedNumberOfMeasurements = 0L
+
+ def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null
+ def addToBuffer(compactRecord: Long): Unit = {
+ mergedNumberOfMeasurements += countFromCompactRecord(compactRecord)
+ buffer.put(compactRecord)
+ }
+
+ while (selfIterator.hasNext) {
+ val selfCurrentRecord = selfIterator.next()
+
+ // Advance that to no further than the level of selfCurrentRecord
+ thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord
+ while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) {
+ addToBuffer(thatCurrentRecord.rawCompactRecord)
+ thatCurrentRecord = nextOrNull(thatIterator)
+ }
+
+ // Include the current record of self and optionally merge if has the same level as thatCurrentRecord
+ if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) {
+ addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord))
+ thatCurrentRecord = nextOrNull(thatIterator)
+ } else {
+ addToBuffer(selfCurrentRecord.rawCompactRecord)
+ }
+ }
+
+ // Include everything that might have been left from that
+ if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord)
+ while (thatIterator.hasNext) {
+ addToBuffer(thatIterator.next().rawCompactRecord)
+ }
+
+ buffer.flip()
+ val compactRecords = Array.ofDim[Long](buffer.limit())
+ buffer.get(compactRecords)
+
+ new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude)
+ }
+ }
+
+ @inline private def mergeCompactRecords(left: Long, right: Long): Long = {
+ val index = left >> 48
+ val leftCount = countFromCompactRecord(left)
+ val rightCount = countFromCompactRecord(right)
+
+ CompactHdrSnapshot.compactRecord(index, leftCount + rightCount)
+ }
+
+ @inline private def levelFromCompactRecord(compactRecord: Long): Long = {
+ val countsArrayIndex = (compactRecord >> 48).toInt
+ var bucketIndex: Int = (countsArrayIndex >> subBucketHalfCountMagnitude) - 1
+ var subBucketIndex: Int = (countsArrayIndex & (subBucketHalfCount - 1)) + subBucketHalfCount
+ if (bucketIndex < 0) {
+ subBucketIndex -= subBucketHalfCount
+ bucketIndex = 0
+ }
+
+ subBucketIndex.toLong << (bucketIndex + unitMagnitude)
+ }
+
+ @inline private def countFromCompactRecord(compactRecord: Long): Long =
+ compactRecord & CompactHdrSnapshot.CompactRecordCountMask
+
+ def recordsIterator: Iterator[Histogram.Record] = new Iterator[Histogram.Record] {
+ var currentIndex = 0
+ val mutableRecord = Histogram.MutableRecord(0, 0)
+
+ override def hasNext: Boolean = currentIndex < compactRecords.length
+
+ override def next(): Histogram.Record = {
+ if (hasNext) {
+ val measurement = compactRecords(currentIndex)
+ mutableRecord.rawCompactRecord = measurement
+ mutableRecord.level = levelFromCompactRecord(measurement)
+ mutableRecord.count = countFromCompactRecord(measurement)
+ currentIndex += 1
+
+ mutableRecord
+ } else {
+ throw new IllegalStateException("The iterator has already been consumed.")
+ }
+ }
+ }
+}
+
+object CompactHdrSnapshot {
+ val CompactRecordCountMask = 0xFFFFFFFFFFFFL
+
+ def compactRecord(index: Long, count: Long): Long = (index << 48) | count
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
new file mode 100644
index 00000000..471e7bd4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
@@ -0,0 +1,116 @@
+package kamon.metric.instrument
+
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+import java.lang.Math.abs
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+import akka.actor.{ ActorSystem, Cancellable }
+import com.typesafe.config.Config
+import jsr166e.LongMaxUpdater
+import kamon.metric.{ Scale, MetricRecorder, CollectionContext }
+import kamon.util.PaddedAtomicLong
+import scala.concurrent.duration.FiniteDuration
+
+trait MinMaxCounter extends MetricRecorder {
+ override type SnapshotType = Histogram.Snapshot
+
+ def increment(): Unit
+ def increment(times: Long): Unit
+ def decrement()
+ def decrement(times: Long)
+}
+
+object MinMaxCounter {
+
+ def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration,
+ system: ActorSystem): MinMaxCounter = {
+
+ val underlyingHistogram = Histogram(highestTrackableValue, precision, scale)
+ val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram)
+
+ val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) {
+ minMaxCounter.refreshValues()
+ }(system.dispatcher) // TODO: Move this to Kamon dispatchers
+
+ minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule)
+ minMaxCounter
+ }
+
+ def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = {
+ import scala.concurrent.duration._
+
+ val highest = config.getLong("highest-trackable-value")
+ val significantDigits = config.getInt("significant-value-digits")
+ val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS)
+
+ apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system)
+ }
+}
+
+class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter {
+ private val min = new LongMaxUpdater
+ private val max = new LongMaxUpdater
+ private val sum = new PaddedAtomicLong
+ val refreshValuesSchedule = new AtomicReference[Cancellable]()
+
+ min.update(0L)
+ max.update(0L)
+
+ def increment(): Unit = increment(1L)
+
+ def increment(times: Long): Unit = {
+ val currentValue = sum.addAndGet(times)
+ max.update(currentValue)
+ }
+
+ def decrement(): Unit = decrement(1L)
+
+ def decrement(times: Long): Unit = {
+ val currentValue = sum.addAndGet(-times)
+ min.update(-currentValue)
+ }
+
+ def collect(context: CollectionContext): Histogram.Snapshot = {
+ refreshValues()
+ underlyingHistogram.collect(context)
+ }
+
+ def cleanup: Unit = {
+ if (refreshValuesSchedule.get() != null)
+ refreshValuesSchedule.get().cancel()
+ }
+
+ def refreshValues(): Unit = {
+ val currentValue = {
+ val value = sum.get()
+ if (value < 0) 0 else value
+ }
+
+ val currentMin = {
+ val minAbs = abs(min.maxThenReset())
+ if (minAbs <= currentValue) minAbs else 0
+ }
+
+ underlyingHistogram.record(currentValue)
+ underlyingHistogram.record(currentMin)
+ underlyingHistogram.record(max.maxThenReset())
+
+ max.update(currentValue)
+ min.update(-currentValue)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metric/package.scala
index 640157a9..43166058 100644
--- a/kamon-core/src/main/scala/kamon/metrics/package.scala
+++ b/kamon-core/src/main/scala/kamon/metric/package.scala
@@ -19,12 +19,7 @@ package kamon
import scala.annotation.tailrec
import com.typesafe.config.Config
-package object metrics {
-
- case class HdrPrecisionConfig(highestTrackableValue: Long, significantValueDigits: Int)
-
- def extractPrecisionConfig(config: Config): HdrPrecisionConfig =
- HdrPrecisionConfig(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits"))
+package object metric {
@tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = {
if (right.isEmpty)
diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
deleted file mode 100644
index 9e19dced..00000000
--- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metrics
-
-import com.typesafe.config.Config
-import kamon.metrics.instruments.CounterRecorder
-import org.HdrHistogram.HdrRecorder
-
-case class ActorMetrics(name: String) extends MetricGroupIdentity {
- val category = ActorMetrics
-}
-
-object ActorMetrics extends MetricGroupCategory {
- val name = "actor"
-
- case object ProcessingTime extends MetricIdentity { val name, tag = "processing-time" }
- case object MailboxSize extends MetricIdentity { val name, tag = "mailbox-size" }
- case object TimeInMailbox extends MetricIdentity { val name, tag = "time-in-mailbox" }
- case object ErrorCounter extends MetricIdentity { val name, tag = "errors" }
-
- case class ActorMetricRecorder(processingTime: MetricRecorder, mailboxSize: MetricRecorder, timeInMailbox: MetricRecorder, errorCounter: MetricRecorder)
- extends MetricGroupRecorder {
-
- def collect: MetricGroupSnapshot = {
- ActorMetricSnapshot(processingTime.collect(), mailboxSize.collect(), timeInMailbox.collect(), errorCounter.collect())
- }
- }
-
- case class ActorMetricSnapshot(processingTime: MetricSnapshotLike, mailboxSize: MetricSnapshotLike, timeInMailbox: MetricSnapshotLike, errorCounter: MetricSnapshotLike)
- extends MetricGroupSnapshot {
-
- val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map(
- (ProcessingTime -> processingTime),
- (MailboxSize -> mailboxSize),
- (TimeInMailbox -> timeInMailbox),
- (ErrorCounter -> errorCounter))
- }
-
- val Factory = new MetricGroupFactory {
- type GroupRecorder = ActorMetricRecorder
-
- def create(config: Config): ActorMetricRecorder = {
- val settings = config.getConfig("precision.actor")
-
- val processingTimeConfig = extractPrecisionConfig(settings.getConfig("processing-time"))
- val mailboxSizeConfig = extractPrecisionConfig(settings.getConfig("mailbox-size"))
- val timeInMailboxConfig = extractPrecisionConfig(settings.getConfig("time-in-mailbox"))
-
- new ActorMetricRecorder(
- HdrRecorder(processingTimeConfig.highestTrackableValue, processingTimeConfig.significantValueDigits, Scale.Nano),
- HdrRecorder(mailboxSizeConfig.highestTrackableValue, mailboxSizeConfig.significantValueDigits, Scale.Unit),
- HdrRecorder(timeInMailboxConfig.highestTrackableValue, timeInMailboxConfig.significantValueDigits, Scale.Nano),
- CounterRecorder())
- }
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala b/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala
deleted file mode 100644
index cd0afac1..00000000
--- a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metrics
-
-import kamon.metrics.instruments.ContinuousHdrRecorder
-import org.HdrHistogram.HdrRecorder
-import com.typesafe.config.Config
-
-case class CustomMetric(name: String) extends MetricGroupIdentity {
- val category = CustomMetric
-}
-
-object CustomMetric extends MetricGroupCategory {
- val name = "custom-metric"
- val RecordedValues = new MetricIdentity { val name, tag = "recorded-values" }
-
- def histogram(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale, continuous: Boolean = false) =
- new MetricGroupFactory {
-
- type GroupRecorder = CustomMetricRecorder
-
- def create(config: Config): CustomMetricRecorder = {
- val recorder =
- if (continuous) ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale)
- else HdrRecorder(highestTrackableValue, significantValueDigits, scale)
-
- new CustomMetricRecorder(RecordedValues, recorder)
- }
- }
-
- class CustomMetricRecorder(identity: MetricIdentity, underlyingRecorder: HdrRecorder)
- extends MetricGroupRecorder {
-
- def record(value: Long): Unit = underlyingRecorder.record(value)
-
- def collect: MetricGroupSnapshot = DefaultMetricGroupSnapshot(Map((identity, underlyingRecorder.collect())))
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala
deleted file mode 100644
index f41e0c3f..00000000
--- a/kamon-core/src/main/scala/kamon/metrics/DispatcherMetrics.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metrics
-
-import com.typesafe.config.Config
-import org.HdrHistogram.HdrRecorder
-
-case class DispatcherMetrics(name: String) extends MetricGroupIdentity {
- val category = DispatcherMetrics
-}
-
-object DispatcherMetrics extends MetricGroupCategory {
- val name = "dispatcher"
-
- case object MaximumPoolSize extends MetricIdentity { val name, tag = "maximum-pool-size" }
- case object RunningThreadCount extends MetricIdentity { val name, tag = "running-thread-count" }
- case object QueueTaskCount extends MetricIdentity { val name, tag = "queued-task-count" }
- case object PoolSize extends MetricIdentity { val name, tag = "pool-size" }
-
- case class DispatcherMetricRecorder(maximumPoolSize: MetricRecorder, runningThreadCount: MetricRecorder, queueTaskCount: MetricRecorder, poolSize: MetricRecorder)
- extends MetricGroupRecorder {
-
- def collect: MetricGroupSnapshot = {
- DispatcherMetricSnapshot(maximumPoolSize.collect(), runningThreadCount.collect(), queueTaskCount.collect(), poolSize.collect())
- }
- }
-
- case class DispatcherMetricSnapshot(maximumPoolSize: MetricSnapshotLike, runningThreadCount: MetricSnapshotLike, queueTaskCount: MetricSnapshotLike, poolSize: MetricSnapshotLike)
- extends MetricGroupSnapshot {
-
- val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map(
- (MaximumPoolSize -> maximumPoolSize),
- (RunningThreadCount -> runningThreadCount),
- (QueueTaskCount -> queueTaskCount),
- (PoolSize -> poolSize))
- }
-
- val Factory = new MetricGroupFactory {
- type GroupRecorder = DispatcherMetricRecorder
-
- def create(config: Config): DispatcherMetricRecorder = {
- val settings = config.getConfig("precision.dispatcher")
-
- val MaximumPoolSizeConfig = extractPrecisionConfig(settings.getConfig("maximum-pool-size"))
- val RunningThreadCountConfig = extractPrecisionConfig(settings.getConfig("running-thread-count"))
- val QueueTaskCountConfig = extractPrecisionConfig(settings.getConfig("queued-task-count"))
- val PoolSizeConfig = extractPrecisionConfig(settings.getConfig("pool-size"))
-
- new DispatcherMetricRecorder(
- HdrRecorder(MaximumPoolSizeConfig.highestTrackableValue, MaximumPoolSizeConfig.significantValueDigits, Scale.Unit),
- HdrRecorder(RunningThreadCountConfig.highestTrackableValue, RunningThreadCountConfig.significantValueDigits, Scale.Unit),
- HdrRecorder(QueueTaskCountConfig.highestTrackableValue, QueueTaskCountConfig.significantValueDigits, Scale.Unit),
- HdrRecorder(PoolSizeConfig.highestTrackableValue, PoolSizeConfig.significantValueDigits, Scale.Unit))
- }
- }
-}
-
diff --git a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala
deleted file mode 100644
index f07bf38e..00000000
--- a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metrics
-
-import annotation.tailrec
-import com.typesafe.config.Config
-import kamon.metrics.MetricSnapshot.Measurement
-import kamon.metrics.InstrumentTypes.InstrumentType
-
-trait MetricGroupCategory {
- def name: String
-}
-
-trait MetricGroupIdentity {
- def name: String
- def category: MetricGroupCategory
-}
-
-trait MetricIdentity {
- def name: String
- def tag: String
-}
-
-trait MetricGroupRecorder {
- def collect: MetricGroupSnapshot
-}
-
-trait MetricGroupSnapshot {
- def metrics: Map[MetricIdentity, MetricSnapshotLike]
-}
-
-case class DefaultMetricGroupSnapshot(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot
-
-trait MetricRecorder {
- def record(value: Long)
- def collect(): MetricSnapshotLike
-}
-
-object InstrumentTypes {
- sealed trait InstrumentType
- case object Histogram extends InstrumentType
- case object Gauge extends InstrumentType
- case object Counter extends InstrumentType
-}
-
-trait MetricSnapshotLike {
- def instrumentType: InstrumentType
- def numberOfMeasurements: Long
- def scale: Scale
- def measurements: Vector[Measurement]
-
- def max: Long = measurements.lastOption.map(_.value).getOrElse(0)
- def min: Long = measurements.headOption.map(_.value).getOrElse(0)
-
- def merge(that: MetricSnapshotLike): MetricSnapshotLike = {
- val mergedMeasurements = Vector.newBuilder[Measurement]
-
- @tailrec def go(left: Vector[Measurement], right: Vector[Measurement], totalNrOfMeasurements: Long): Long = {
- if (left.nonEmpty && right.nonEmpty) {
- val leftValue = left.head
- val rightValue = right.head
-
- if (rightValue.value == leftValue.value) {
- val merged = rightValue.merge(leftValue)
- mergedMeasurements += merged
- go(left.tail, right.tail, totalNrOfMeasurements + merged.count)
- } else {
- if (leftValue.value < rightValue.value) {
- mergedMeasurements += leftValue
- go(left.tail, right, totalNrOfMeasurements + leftValue.count)
- } else {
- mergedMeasurements += rightValue
- go(left, right.tail, totalNrOfMeasurements + rightValue.count)
- }
- }
- } else {
- if (left.isEmpty && right.nonEmpty) {
- mergedMeasurements += right.head
- go(left, right.tail, totalNrOfMeasurements + right.head.count)
- } else {
- if (left.nonEmpty && right.isEmpty) {
- mergedMeasurements += left.head
- go(left.tail, right, totalNrOfMeasurements + left.head.count)
- } else totalNrOfMeasurements
- }
- }
- }
-
- val totalNrOfMeasurements = go(measurements, that.measurements, 0)
- MetricSnapshot(instrumentType, totalNrOfMeasurements, scale, mergedMeasurements.result())
- }
-}
-
-case class MetricSnapshot(instrumentType: InstrumentType, numberOfMeasurements: Long, scale: Scale,
- measurements: Vector[MetricSnapshot.Measurement]) extends MetricSnapshotLike
-
-object MetricSnapshot {
- case class Measurement(value: Long, count: Long) {
- def merge(that: Measurement) = Measurement(value, count + that.count)
- }
-}
-
-trait MetricGroupFactory {
- type GroupRecorder <: MetricGroupRecorder
- def create(config: Config): GroupRecorder
-}
-
diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
deleted file mode 100644
index 5454edf5..00000000
--- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metrics
-
-import org.HdrHistogram.HdrRecorder
-import scala.collection.concurrent.TrieMap
-import com.typesafe.config.Config
-
-case class TraceMetrics(name: String) extends MetricGroupIdentity {
- val category = TraceMetrics
-}
-
-object TraceMetrics extends MetricGroupCategory {
- val name = "trace"
-
- case object ElapsedTime extends MetricIdentity { val name, tag = "elapsed-time" }
- case class HttpClientRequest(name: String, tag: String) extends MetricIdentity
-
- class TraceMetricRecorder(val elapsedTime: HdrRecorder, private val segmentRecorderFactory: () ⇒ HdrRecorder)
- extends MetricGroupRecorder {
-
- private val segments = TrieMap[MetricIdentity, HdrRecorder]()
-
- def segmentRecorder(segmentIdentity: MetricIdentity): HdrRecorder =
- segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply())
-
- def collect: MetricGroupSnapshot = TraceMetricSnapshot(elapsedTime.collect(),
- segments.map { case (identity, recorder) ⇒ (identity, recorder.collect()) }.toMap)
- }
-
- case class TraceMetricSnapshot(elapsedTime: MetricSnapshotLike, segments: Map[MetricIdentity, MetricSnapshotLike])
- extends MetricGroupSnapshot {
-
- def metrics: Map[MetricIdentity, MetricSnapshotLike] = segments + (ElapsedTime -> elapsedTime)
- }
-
- val Factory = new MetricGroupFactory {
- type GroupRecorder = TraceMetricRecorder
-
- def create(config: Config): TraceMetricRecorder = {
-
- val settings = config.getConfig("precision.trace")
- val elapsedTimeConfig = extractPrecisionConfig(settings.getConfig("elapsed-time"))
- val segmentConfig = extractPrecisionConfig(settings.getConfig("segment"))
-
- new TraceMetricRecorder(
- HdrRecorder(elapsedTimeConfig.highestTrackableValue, elapsedTimeConfig.significantValueDigits, Scale.Nano),
- () ⇒ HdrRecorder(segmentConfig.highestTrackableValue, segmentConfig.significantValueDigits, Scale.Nano))
- }
- }
-
-}
diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala
deleted file mode 100644
index 3a39ec69..00000000
--- a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metrics.instruments
-
-import org.HdrHistogram.HdrRecorder
-import kamon.metrics.{ Scale, MetricSnapshotLike }
-
-/**
- * This recorder keeps track of the last value recoded and automatically adds it after collecting a snapshot. This is
- * useful in cases where the absence of recordings does not necessarily mean the absence of values. For example, if this
- * recorder is used for recording the mailbox size of an actor, and it only gets updated upon message enqueue o dequeue,
- * the absence of recordings during 1 second means that the size hasn't change (example: the actor being blocked doing
- * some work) and it should keep its last known value, instead of dropping to zero and then going back to the real value
- * after a new event is processed.
- *
- */
-class ContinuousHdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale)
- extends HdrRecorder(highestTrackableValue, significantValueDigits, scale) {
-
- @volatile private var lastRecordedValue: Long = 0
-
- override def record(value: Long): Unit = {
- lastRecordedValue = value
- super.record(value)
- }
-
- override def collect(): MetricSnapshotLike = {
- val snapshot = super.collect()
- super.record(lastRecordedValue)
-
- snapshot
- }
-}
-
-object ContinuousHdrRecorder {
- def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) =
- new ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale)
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala
deleted file mode 100644
index ce4fd76d..00000000
--- a/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package org.HdrHistogram
-
-import java.util.concurrent.atomic.AtomicLongFieldUpdater
-import scala.annotation.tailrec
-import kamon.metrics._
-
-/**
- * This implementation aims to be used for real time data collection where data snapshots are taken often over time.
- * The snapshotAndReset() operation extracts all the recorded values from the histogram and resets the counts, but still
- * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken.
- */
-class HdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale)
- extends AtomicHistogram(1L, highestTrackableValue, significantValueDigits) with MetricRecorder {
-
- import HdrRecorder.totalCountUpdater
-
- def record(value: Long): Unit = recordValue(value)
-
- def collect(): MetricSnapshotLike = {
- val entries = Vector.newBuilder[MetricSnapshot.Measurement]
- val countsLength = counts.length()
-
- @tailrec def iterate(index: Int, previousValue: Long, nrOfRecordings: Long, bucketLimit: Long, increment: Long): Long = {
- if (index < countsLength) {
- val currentValue = previousValue + increment
- val countAtValue = counts.getAndSet(index, 0)
-
- if (countAtValue > 0)
- entries += MetricSnapshot.Measurement(currentValue, countAtValue)
-
- if (currentValue == bucketLimit)
- iterate(index + 1, currentValue, nrOfRecordings + countAtValue, (bucketLimit << 1) + 1, increment << 1)
- else
- iterate(index + 1, currentValue, nrOfRecordings + countAtValue, bucketLimit, increment)
- } else {
- nrOfRecordings
- }
- }
-
- val nrOfRecordings = iterate(0, -1, 0, subBucketMask, 1)
-
- def tryUpdateTotalCount: Boolean = {
- val previousTotalCount = getTotalCount
- val newTotalCount = previousTotalCount - nrOfRecordings
-
- totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount)
- }
-
- while (!tryUpdateTotalCount) {}
-
- MetricSnapshot(InstrumentTypes.Histogram, nrOfRecordings, scale, entries.result())
- }
-
-}
-
-object HdrRecorder {
- val totalCountUpdater = AtomicLongFieldUpdater.newUpdater(classOf[AtomicHistogram], "totalCount")
-
- def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale): HdrRecorder =
- new HdrRecorder(highestTrackableValue, significantValueDigits, scale)
-
-}
diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala
deleted file mode 100644
index ba2550af..00000000
--- a/kamon-core/src/main/scala/kamon/metrics/instruments/MinMaxCounter.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-package kamon.metrics.instruments
-
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-import java.lang.Math._
-import jsr166e.LongMaxUpdater
-import kamon.util.PaddedAtomicLong
-import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement
-
-class MinMaxCounter {
- private val min = new LongMaxUpdater
- private val max = new LongMaxUpdater
- private val sum = new PaddedAtomicLong
-
- min.update(0L)
- max.update(0L)
-
- def increment(value: Long = 1L): Unit = {
- val currentValue = sum.addAndGet(value)
- max.update(currentValue)
- }
-
- def decrement(value: Long = 1L): Unit = {
- val currentValue = sum.addAndGet(-value)
- min.update(-currentValue)
- }
-
- def collect(): CounterMeasurement = {
- val currentValue = {
- val value = sum.get()
- if (value < 0) 0 else value
- }
- val result = CounterMeasurement(abs(min.maxThenReset()), max.maxThenReset(), currentValue)
- max.update(currentValue)
- min.update(-currentValue)
- result
- }
-}
-
-object MinMaxCounter {
- def apply() = new MinMaxCounter()
-
- case class CounterMeasurement(min: Long, max: Long, current: Long)
-}
diff --git a/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala b/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala
new file mode 100644
index 00000000..258cc1b2
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/standalone/KamonStandalone.scala
@@ -0,0 +1,11 @@
+package kamon.standalone
+
+import akka.actor.ActorSystem
+
+object KamonStandalone {
+ private lazy val system = ActorSystem("kamon-standalone")
+
+ def registerHistogram(name: String) = {
+
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 307cf17a..9ce3cd4e 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -18,11 +18,11 @@ package kamon.trace
import akka.actor.ActorSystem
import kamon.Kamon
-import kamon.metrics._
+import kamon.metric._
import java.util.concurrent.ConcurrentLinkedQueue
import kamon.trace.TraceContextAware.DefaultTraceContextAware
import kamon.trace.TraceContext.SegmentIdentity
-import kamon.metrics.TraceMetrics.TraceMetricRecorder
+import kamon.metric.TraceMetrics.TraceMetricRecorder
trait TraceContext {
def name: String
@@ -41,7 +41,7 @@ object TraceContext {
}
trait SegmentCompletionHandle {
- def finish(metadata: Map[String, String])
+ def finish(metadata: Map[String, String] = Map.empty)
}
case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String])
@@ -76,7 +76,7 @@ object SegmentCompletionHandleAware {
}
class SimpleMetricCollectionContext(@volatile private var _name: String, val token: String, metadata: Map[String, String],
- val system: ActorSystem) extends TraceContext {
+ val system: ActorSystem) extends TraceContext {
@volatile private var _isOpen = true
val levelOfDetail = OnlyMetrics
val startMark = System.nanoTime()
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
index 0e264cd2..efe08cdb 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
@@ -33,7 +33,7 @@ object TraceRecorder {
def newToken = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet())
private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String],
- system: ActorSystem): TraceContext = {
+ system: ActorSystem): TraceContext = {
// In the future this should select between implementations.
val finalToken = token.getOrElse(newToken)
@@ -51,7 +51,7 @@ object TraceRecorder {
traceContextStorage.set(Some(ctx))
}
- def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): Option[SegmentCompletionHandle] =
+ def startSegment(identity: SegmentIdentity, metadata: Map[String, String] = Map.empty): Option[SegmentCompletionHandle] =
currentContext.map(_.startSegment(identity, metadata))
def rename(name: String): Unit = currentContext.map(_.rename(name))
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala
index 4e62c9f7..0f682500 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala
+++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala
@@ -13,19 +13,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ========================================================== */
-package kamon.trace.instrumentation
-
-import org.scalatest.WordSpecLike
-import akka.actor.{ Actor, Props, ActorSystem }
+package kamon.instrumentation.akka
+import akka.actor.{ Actor, ActorSystem, Props }
+import akka.pattern.{ ask, pipe }
+import akka.routing.RoundRobinPool
import akka.testkit.{ ImplicitSender, TestKit }
-import kamon.trace.TraceRecorder
-import akka.pattern.{ pipe, ask }
import akka.util.Timeout
+import kamon.trace.TraceRecorder
+import org.scalatest.WordSpecLike
+
import scala.concurrent.duration._
-import akka.routing.{ RoundRobinPool }
-class ActorMessagePassingTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender {
+class ActorCellInstrumentationSpec extends TestKit(ActorSystem("actor-cell-instrumentation-spec")) with WordSpecLike
+ with ImplicitSender {
+
implicit val executionContext = system.dispatcher
"the message passing instrumentation" should {
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala
index 81fd9cbc..3dab44bc 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala
+++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala
@@ -13,15 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ========================================================== */
-package kamon.trace.instrumentation
+package kamon.instrumentation.akka
-import akka.testkit.TestKit
-import org.scalatest.{ Inspectors, Matchers, WordSpecLike }
-import akka.actor.{ Props, ActorLogging, Actor, ActorSystem }
+import akka.actor.{ Actor, ActorLogging, ActorSystem, Props }
import akka.event.Logging.LogEvent
+import akka.testkit.TestKit
import kamon.trace.{ TraceContextAware, TraceRecorder }
+import org.scalatest.{ Inspectors, Matchers, WordSpecLike }
-class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with WordSpecLike with Matchers with Inspectors {
+class ActorLoggingInstrumentationSpec extends TestKit(ActorSystem("actor-logging-instrumentation-spec")) with WordSpecLike
+ with Matchers with Inspectors {
"the ActorLogging instrumentation" should {
"attach the TraceContext (if available) to log events" in {
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala
index ed239b38..47867c55 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala
+++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala
@@ -1,14 +1,17 @@
-package kamon.trace.instrumentation
+package kamon.instrumentation.akka
-import akka.testkit.{ ImplicitSender, TestKit }
+import akka.actor.SupervisorStrategy.{ Escalate, Restart, Resume, Stop }
import akka.actor._
-import org.scalatest.WordSpecLike
+import akka.testkit.{ ImplicitSender, TestKit }
import kamon.trace.TraceRecorder
-import scala.util.control.NonFatal
-import akka.actor.SupervisorStrategy.{ Escalate, Stop, Restart, Resume }
+import org.scalatest.WordSpecLike
+
import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+class ActorSystemMessageInstrumentationSpec extends TestKit(ActorSystem("actor-system-message-instrumentation-spec"))
+ with WordSpecLike with ImplicitSender {
-class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender {
implicit val executionContext = system.dispatcher
"the system message passing instrumentation" should {
@@ -107,7 +110,7 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
}
def supervisorWithDirective(directive: SupervisorStrategy.Directive, sendPreRestart: Boolean = false, sendPostRestart: Boolean = false,
- sendPostStop: Boolean = false, sendPreStart: Boolean = false): ActorRef = {
+ sendPostStop: Boolean = false, sendPreStart: Boolean = false): ActorRef = {
class GrandParent extends Actor {
val child = context.actorOf(Props(new Parent))
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala
index fb886de6..d914ffe8 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala
+++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala
@@ -14,19 +14,20 @@
* =========================================================================================
*/
-package kamon.trace.instrumentation
+package kamon.instrumentation.akka
-import akka.testkit.TestKitBase
-import akka.actor.{ Props, Actor, ActorSystem }
-import org.scalatest.{ Matchers, WordSpecLike }
+import akka.actor.{ Actor, ActorSystem, Props }
import akka.event.Logging.Warning
-import scala.concurrent.duration._
import akka.pattern.ask
+import akka.testkit.TestKitBase
import akka.util.Timeout
-import kamon.trace.{ TraceContextAware, TraceRecorder }
import com.typesafe.config.ConfigFactory
+import kamon.trace.{ TraceContextAware, TraceRecorder }
+import org.scalatest.{ Matchers, WordSpecLike }
+
+import scala.concurrent.duration._
-class AskPatternTracingSpec extends TestKitBase with WordSpecLike with Matchers {
+class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers {
implicit lazy val system: ActorSystem = ActorSystem("ask-pattern-tracing-spec", ConfigFactory.parseString(
"""
|kamon {
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala
index b1765fd8..31afd3ff 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala
+++ b/kamon-core/src/test/scala/kamon/instrumentation/scala/FutureInstrumentationSpec.scala
@@ -13,16 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ========================================================== */
-package kamon.trace.instrumentation
+package kamon.instrumentation.scala
-import scala.concurrent.{ ExecutionContext, Future }
-import org.scalatest.{ Matchers, OptionValues, WordSpecLike }
-import org.scalatest.concurrent.{ ScalaFutures, PatienceConfiguration }
-import kamon.trace.TraceRecorder
-import akka.testkit.TestKit
import akka.actor.ActorSystem
+import akka.testkit.TestKit
+import kamon.trace.TraceRecorder
+import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures }
+import org.scalatest.{ Matchers, OptionValues, WordSpecLike }
+
+import scala.concurrent.Future
-class FutureTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with Matchers
+class FutureInstrumentationSpec extends TestKit(ActorSystem("future-instrumentation-spec")) with WordSpecLike with Matchers
with ScalaFutures with PatienceConfiguration with OptionValues {
implicit val execContext = system.dispatcher
diff --git a/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala
new file mode 100644
index 00000000..481f03c5
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala
@@ -0,0 +1,202 @@
+/* =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import java.nio.LongBuffer
+
+import akka.instrumentation.ActorCellMetrics
+import kamon.metric.ActorMetricsTestActor._
+import kamon.metric.instrument.Histogram.MutableRecord
+import org.scalatest.{ WordSpecLike, Matchers }
+import akka.testkit.{ ImplicitSender, TestProbe, TestKitBase }
+import akka.actor._
+import com.typesafe.config.ConfigFactory
+import scala.concurrent.duration._
+import kamon.metric.Subscriptions.TickMetricSnapshot
+import kamon.metric.ActorMetrics.{ ActorMetricsRecorder, ActorMetricSnapshot }
+
+class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
+ implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics {
+ | filters = [
+ | {
+ | actor {
+ | includes = [ "user/tracked-*", "user/measuring-*", "user/clean-after-collect" ]
+ | excludes = [ "user/tracked-explicitly-excluded"]
+ | }
+ | }
+ | ]
+ | precision {
+ | default-histogram-precision {
+ | highest-trackable-value = 3600000000000
+ | significant-value-digits = 2
+ | }
+ |
+ | default-min-max-counter-precision {
+ | refresh-interval = 1 second
+ | highest-trackable-value = 999999999
+ | significant-value-digits = 2
+ | }
+ | }
+ |}
+ """.stripMargin))
+
+ "the Kamon actor metrics" should {
+ "respect the configured include and exclude filters" in new ActorMetricsFixtures {
+ val trackedActor = createTestActor("tracked-actor")
+ actorMetricsRecorderOf(trackedActor) should not be empty
+
+ val nonTrackedActor = createTestActor("non-tracked-actor")
+ actorMetricsRecorderOf(nonTrackedActor) shouldBe empty
+
+ val trackedButExplicitlyExcluded = createTestActor("tracked-explicitly-excluded")
+ actorMetricsRecorderOf(trackedButExplicitlyExcluded) shouldBe empty
+ }
+
+ "reset all recording instruments after taking a snapshot" in new ActorMetricsFixtures {
+ val trackedActor = createTestActor("clean-after-collect")
+ val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get
+ for (i ← 1 to 100) {
+ trackedActor ! Discard
+ }
+ trackedActor ! Fail
+ trackedActor ! TrackTimings(sleep = Some(1 second))
+ expectMsgType[TrackedTimings]
+
+ val firstSnapshot = takeSnapshotOf(trackedActorMetrics)
+ firstSnapshot.errors.count should be(1L)
+ firstSnapshot.mailboxSize.numberOfMeasurements should be > 0L
+ firstSnapshot.processingTime.numberOfMeasurements should be(103L) // 102 examples + Initialize message
+ firstSnapshot.timeInMailbox.numberOfMeasurements should be(103L) // 102 examples + Initialize message
+
+ val secondSnapshot = takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean
+ secondSnapshot.errors.count should be(0L)
+ secondSnapshot.mailboxSize.numberOfMeasurements should be <= 3L
+ secondSnapshot.processingTime.numberOfMeasurements should be(0L) // 102 examples + Initialize message
+ secondSnapshot.timeInMailbox.numberOfMeasurements should be(0L) // 102 examples + Initialize message
+ }
+
+ "record the processing-time of the receive function" in new ActorMetricsFixtures {
+ val trackedActor = createTestActor("measuring-processing-time")
+ val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get
+ takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean
+
+ trackedActor ! TrackTimings(sleep = Some(1 second))
+ val timings = expectMsgType[TrackedTimings]
+ val snapshot = takeSnapshotOf(trackedActorMetrics)
+
+ snapshot.processingTime.numberOfMeasurements should be(1L)
+ snapshot.processingTime.recordsIterator.next().count should be(1L)
+ snapshot.processingTime.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos)
+ }
+
+ "record the number of errors" in new ActorMetricsFixtures {
+ val trackedActor = createTestActor("measuring-errors")
+ val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get
+ takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean
+
+ for (i ← 1 to 10) { trackedActor ! Fail }
+ trackedActor ! Ping
+ expectMsg(Pong)
+ val snapshot = takeSnapshotOf(trackedActorMetrics)
+
+ snapshot.errors.count should be(10)
+ }
+
+ "record the mailbox-size" in new ActorMetricsFixtures {
+ val trackedActor = createTestActor("measuring-mailbox-size")
+ val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get
+ takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean
+
+ trackedActor ! TrackTimings(sleep = Some(1 second))
+ for (i ← 1 to 10) {
+ trackedActor ! Discard
+ }
+ trackedActor ! Ping
+
+ val timings = expectMsgType[TrackedTimings]
+ expectMsg(Pong)
+ val snapshot = takeSnapshotOf(trackedActorMetrics)
+
+ snapshot.mailboxSize.min should be(0L)
+ snapshot.mailboxSize.max should be(11L +- 1L)
+ }
+
+ "record the time-in-mailbox" in new ActorMetricsFixtures {
+ val trackedActor = createTestActor("measuring-time-in-mailbox")
+ val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get
+ takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean
+
+ trackedActor ! TrackTimings(sleep = Some(1 second))
+ val timings = expectMsgType[TrackedTimings]
+ val snapshot = takeSnapshotOf(trackedActorMetrics)
+
+ snapshot.timeInMailbox.numberOfMeasurements should be(1L)
+ snapshot.timeInMailbox.recordsIterator.next().count should be(1L)
+ snapshot.timeInMailbox.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos)
+ }
+ }
+
+ trait ActorMetricsFixtures {
+ val collectionContext = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(10000)
+ }
+
+ def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetricsRecorder] = {
+ val initialisationListener = TestProbe()
+ ref.tell(Ping, initialisationListener.ref)
+ initialisationListener.expectMsg(Pong)
+
+ val underlyingCellField = ref.getClass.getDeclaredMethod("underlying")
+ val cell = underlyingCellField.invoke(ref).asInstanceOf[ActorCellMetrics]
+
+ cell.actorMetricsRecorder
+ }
+
+ def createTestActor(name: String): ActorRef = system.actorOf(Props[ActorMetricsTestActor], name)
+
+ def takeSnapshotOf(amr: ActorMetricsRecorder): ActorMetricSnapshot = amr.collect(collectionContext)
+ }
+}
+
+class ActorMetricsTestActor extends Actor {
+ def receive = {
+ case Discard ⇒
+ case Fail ⇒ 1 / 0
+ case Ping ⇒ sender ! Pong
+ case TrackTimings(sendTimestamp, sleep) ⇒ {
+ val dequeueTimestamp = System.nanoTime()
+ sleep.map(s ⇒ Thread.sleep(s.toMillis))
+ val afterReceiveTimestamp = System.nanoTime()
+
+ sender ! TrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp)
+ }
+ }
+}
+
+object ActorMetricsTestActor {
+ case object Ping
+ case object Pong
+ case object Fail
+ case object Discard
+
+ case class TrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None)
+ case class TrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) {
+ def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp
+ def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala
index 2a9cb6b4..7434c4ee 100644
--- a/kamon-core/src/test/scala/kamon/metrics/DispatcherMetricsSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala
@@ -13,7 +13,7 @@
* =========================================================================================
*/
-package kamon.metrics
+package kamon.metric
import org.scalatest.{ WordSpecLike, Matchers }
import akka.testkit.{ TestProbe, TestKitBase }
@@ -21,8 +21,8 @@ import akka.actor.{ ActorRef, Props, ActorSystem }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import kamon.Kamon
-import kamon.metrics.Subscriptions.TickMetricSnapshot
-import kamon.metrics.DispatcherMetrics.DispatcherMetricSnapshot
+import kamon.metric.Subscriptions.TickMetricSnapshot
+import kamon.metric.DispatcherMetrics.DispatcherMetricSnapshot
class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
implicit lazy val system: ActorSystem = ActorSystem("dispatcher-metrics-spec", ConfigFactory.parseString(
@@ -52,8 +52,8 @@ class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers
"the Kamon dispatcher metrics" should {
"respect the configured include and exclude filters" in {
- system.actorOf(Props[DelayableActor].withDispatcher("tracked-dispatcher"), "actor-with-tracked-dispatcher")
- system.actorOf(Props[DelayableActor].withDispatcher("dispatcher-explicitly-excluded"), "actor-with-excluded-dispatcher")
+ system.actorOf(Props[ActorMetricsTestActor].withDispatcher("tracked-dispatcher"), "actor-with-tracked-dispatcher")
+ system.actorOf(Props[ActorMetricsTestActor].withDispatcher("dispatcher-explicitly-excluded"), "actor-with-excluded-dispatcher")
Kamon(Metrics).subscribe(DispatcherMetrics, "*", testActor, permanently = true)
expectMsgType[TickMetricSnapshot]
@@ -69,7 +69,7 @@ class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers
val (delayable, metricsListener) = delayableActor("worker-actor", "tracked-dispatcher")
for (_ ← 1 to 100) {
- delayable ! Discard
+ //delayable ! Discard
}
val dispatcherMetrics = expectDispatcherMetrics("tracked-dispatcher", metricsListener, 3 seconds)
@@ -92,7 +92,7 @@ class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers
trait DelayableActorFixture {
def delayableActor(name: String, dispatcher: String): (ActorRef, TestProbe) = {
- val actor = system.actorOf(Props[DelayableActor].withDispatcher(dispatcher), name)
+ val actor = system.actorOf(Props[ActorMetricsTestActor].withDispatcher(dispatcher), name)
val metricsListener = TestProbe()
Kamon(Metrics).subscribe(DispatcherMetrics, "*", metricsListener.ref, permanently = true)
diff --git a/kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
index d0a0c707..ee851672 100644
--- a/kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
@@ -14,16 +14,33 @@
* =========================================================================================
*/
-package kamon.metrics
+package kamon.metric
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.metric.instrument.Histogram
+import kamon.metric.instrument.Histogram.MutableRecord
import org.scalatest.{ Matchers, WordSpecLike }
-import akka.testkit.TestKit
+import akka.testkit.{ ImplicitSender, TestKitBase }
import akka.actor.ActorSystem
import scala.concurrent.duration._
-import kamon.metrics.Subscriptions.TickMetricSnapshot
-import kamon.metrics.MetricSnapshot.Measurement
-
-class TickMetricSnapshotBufferSpec extends TestKit(ActorSystem("tick-metric-snapshot-buffer")) with WordSpecLike with Matchers {
+import kamon.metric.Subscriptions.TickMetricSnapshot
+
+class TickMetricSnapshotBufferSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
+ implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics {
+ | tick-interval = 1 hour
+ | filters = [
+ | {
+ | trace {
+ | includes = [ "*" ]
+ | excludes = [ "non-tracked-trace"]
+ | }
+ | }
+ | ]
+ |}
+ """.stripMargin))
"the TickMetricSnapshotBuffer" should {
"merge TickMetricSnapshots received until the flush timeout is reached and fix the from/to fields" in new SnapshotFixtures {
@@ -55,27 +72,38 @@ class TickMetricSnapshotBufferSpec extends TestKit(ActorSystem("tick-metric-snap
mergedSnapshot.to should equal(4000)
mergedSnapshot.metrics should not be ('empty)
- val testMetricSnapshot = mergedSnapshot.metrics(CustomMetric("test-metric")).metrics(CustomMetric.RecordedValues)
- testMetricSnapshot.min should equal(1)
- testMetricSnapshot.max should equal(10)
- testMetricSnapshot.numberOfMeasurements should equal(35)
- testMetricSnapshot.measurements should contain allOf (Measurement(1, 10), Measurement(4, 9), Measurement(10, 16))
+ val testMetricSnapshot = mergedSnapshot.metrics(testTraceIdentity).metrics(TraceMetrics.ElapsedTime).asInstanceOf[Histogram.Snapshot]
+ testMetricSnapshot.min should equal(10)
+ testMetricSnapshot.max should equal(300)
+ testMetricSnapshot.numberOfMeasurements should equal(6)
+ testMetricSnapshot.recordsIterator.toStream should contain allOf (
+ MutableRecord(10, 3),
+ MutableRecord(20, 1),
+ MutableRecord(30, 1),
+ MutableRecord(300, 1))
}
}
trait SnapshotFixtures {
+ val collectionContext = CollectionContext.default
+ val testTraceIdentity = TraceMetrics("buffer-spec-test-trace")
+ val traceRecorder = Kamon(Metrics).register(testTraceIdentity, TraceMetrics.Factory).get
+
val firstEmpty = TickMetricSnapshot(1000, 2000, Map.empty)
val secondEmpty = TickMetricSnapshot(2000, 3000, Map.empty)
val thirdEmpty = TickMetricSnapshot(3000, 4000, Map.empty)
- val firstNonEmpty = TickMetricSnapshot(1000, 2000,
- Map((CustomMetric("test-metric") -> SimpleGroupSnapshot(Map(CustomMetric.RecordedValues -> MetricSnapshot(InstrumentTypes.Histogram, 20, Scale.Unit, Vector(Measurement(1, 10), Measurement(10, 10))))))))
-
- val secondNonEmpty = TickMetricSnapshot(1000, 2000,
- Map((CustomMetric("test-metric") -> SimpleGroupSnapshot(Map(CustomMetric.RecordedValues -> MetricSnapshot(InstrumentTypes.Histogram, 15, Scale.Unit, Vector(Measurement(4, 9), Measurement(10, 6))))))))
-
+ traceRecorder.elapsedTime.record(10L)
+ traceRecorder.elapsedTime.record(20L)
+ traceRecorder.elapsedTime.record(30L)
+ val firstNonEmpty = TickMetricSnapshot(1000, 2000, Map(
+ (testTraceIdentity -> traceRecorder.collect(collectionContext))))
+
+ traceRecorder.elapsedTime.record(10L)
+ traceRecorder.elapsedTime.record(10L)
+ traceRecorder.elapsedTime.record(300L)
+ val secondNonEmpty = TickMetricSnapshot(1000, 2000, Map(
+ (testTraceIdentity -> traceRecorder.collect(collectionContext))))
}
-
- case class SimpleGroupSnapshot(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot
}
diff --git a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
new file mode 100644
index 00000000..dab9b52a
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
@@ -0,0 +1,92 @@
+package kamon.metric
+
+import akka.actor.ActorSystem
+import akka.testkit.{ ImplicitSender, TestKitBase }
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.metric.TraceMetrics.TraceMetricsSnapshot
+import kamon.trace.TraceContext.SegmentIdentity
+import kamon.trace.TraceRecorder
+import org.scalatest.{ Matchers, WordSpecLike }
+
+class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
+ implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics {
+ | tick-interval = 1 hour
+ | filters = [
+ | {
+ | trace {
+ | includes = [ "*" ]
+ | excludes = [ "non-tracked-trace"]
+ | }
+ | }
+ | ]
+ | precision {
+ | default-histogram-precision {
+ | highest-trackable-value = 3600000000000
+ | significant-value-digits = 2
+ | }
+ |
+ | default-min-max-counter-precision {
+ | refresh-interval = 1 second
+ | highest-trackable-value = 999999999
+ | significant-value-digits = 2
+ | }
+ | }
+ |}
+ """.stripMargin))
+
+ "the TraceMetrics" should {
+ "record the elapsed time between a trace creation and finish" in {
+ for (repetitions ← 1 to 10) {
+ TraceRecorder.withNewTraceContext("record-elapsed-time") {
+ TraceRecorder.finish()
+ }
+ }
+
+ val snapshot = takeSnapshotOf("record-elapsed-time")
+ snapshot.elapsedTime.numberOfMeasurements should be(10)
+ snapshot.segments shouldBe empty
+ }
+
+ "record the elapsed time for segments that occur inside a given trace" in {
+ TraceRecorder.withNewTraceContext("trace-with-segments") {
+ val segmentHandle = TraceRecorder.startSegment(TraceMetricsTestSegment("test-segment"))
+ segmentHandle.get.finish()
+ TraceRecorder.finish()
+ }
+
+ val snapshot = takeSnapshotOf("trace-with-segments")
+ snapshot.elapsedTime.numberOfMeasurements should be(1)
+ snapshot.segments.size should be(1)
+ snapshot.segments(TraceMetricsTestSegment("test-segment")).numberOfMeasurements should be(1)
+ }
+
+ "record the elapsed time for segments that finish after their correspondent trace has finished" in {
+ val segmentHandle = TraceRecorder.withNewTraceContext("closing-segment-after-trace") {
+ val sh = TraceRecorder.startSegment(TraceMetricsTestSegment("test-segment"))
+ TraceRecorder.finish()
+ sh
+ }
+
+ val beforeFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace")
+ beforeFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(1)
+ beforeFinishSegmentSnapshot.segments.size should be(0)
+
+ segmentHandle.get.finish()
+
+ val afterFinishSegmentSnapshot = takeSnapshotOf("closing-segment-after-trace")
+ afterFinishSegmentSnapshot.elapsedTime.numberOfMeasurements should be(0)
+ afterFinishSegmentSnapshot.segments.size should be(1)
+ afterFinishSegmentSnapshot.segments(TraceMetricsTestSegment("test-segment")).numberOfMeasurements should be(1)
+ }
+ }
+
+ case class TraceMetricsTestSegment(name: String) extends SegmentIdentity
+
+ def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = {
+ val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory)
+ recorder.get.collect(CollectionContext.default)
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
new file mode 100644
index 00000000..57bc3d0d
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
@@ -0,0 +1,278 @@
+package kamon.metric
+
+import akka.actor.ActorSystem
+import akka.testkit.{ ImplicitSender, TestKitBase }
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.metric.UserMetrics.{ UserGauge, UserMinMaxCounter, UserCounter, UserHistogram }
+import kamon.metric.instrument.Histogram
+import kamon.metric.instrument.Histogram.MutableRecord
+import org.scalatest.{ Matchers, WordSpecLike }
+import scala.concurrent.duration._
+
+class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
+ implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics {
+ | flush-interval = 1 hour
+ | precision {
+ | default-histogram-precision {
+ | highest-trackable-value = 10000
+ | significant-value-digits = 2
+ | }
+ |
+ | default-min-max-counter-precision {
+ | refresh-interval = 1 hour
+ | highest-trackable-value = 1000
+ | significant-value-digits = 2
+ | }
+ |
+ | default-gauge-precision {
+ | refresh-interval = 1 hour
+ | highest-trackable-value = 999999999
+ | significant-value-digits = 2
+ | }
+ | }
+ |}
+ """.stripMargin))
+
+ "the UserMetrics extension" should {
+ "allow registering a fully configured Histogram and get the same Histogram if registering again" in {
+ val histogramA = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L)
+ val histogramB = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L)
+
+ histogramA shouldBe theSameInstanceAs(histogramB)
+ }
+
+ "return the original Histogram when registering a fully configured Histogram for second time but with different settings" in {
+ val histogramA = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L)
+ val histogramB = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Fine, 50000L)
+
+ histogramA shouldBe theSameInstanceAs(histogramB)
+ }
+
+ "allow registering a Histogram that takes the default configuration from the kamon.metrics.precision settings" in {
+ Kamon(UserMetrics).registerHistogram("histogram-with-default-configuration")
+ }
+
+ "allow registering a Counter and get the same Counter if registering again" in {
+ val counterA = Kamon(UserMetrics).registerCounter("counter")
+ val counterB = Kamon(UserMetrics).registerCounter("counter")
+
+ counterA shouldBe theSameInstanceAs(counterB)
+ }
+
+ "allow registering a fully configured MinMaxCounter and get the same MinMaxCounter if registering again" in {
+ val minMaxCounterA = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second)
+ val minMaxCounterB = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second)
+
+ minMaxCounterA shouldBe theSameInstanceAs(minMaxCounterB)
+ }
+
+ "return the original MinMaxCounter when registering a fully configured MinMaxCounter for second time but with different settings" in {
+ val minMaxCounterA = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second)
+ val minMaxCounterB = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Fine, 5000L, 1 second)
+
+ minMaxCounterA shouldBe theSameInstanceAs(minMaxCounterB)
+ }
+
+ "allow registering a MinMaxCounter that takes the default configuration from the kamon.metrics.precision settings" in {
+ Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-default-configuration")
+ }
+
+ "allow registering a fully configured Gauge and get the same Gauge if registering again" in {
+ val gaugeA = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Normal, 1000L, 1 second) {
+ () ⇒ 1L
+ }
+
+ val gaugeB = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Normal, 1000L, 1 second) {
+ () ⇒ 1L
+ }
+
+ gaugeA shouldBe theSameInstanceAs(gaugeB)
+ }
+
+ "return the original Gauge when registering a fully configured Gauge for second time but with different settings" in {
+ val gaugeA = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Normal, 1000L, 1 second) {
+ () ⇒ 1L
+ }
+
+ val gaugeB = Kamon(UserMetrics).registerGauge("gauge-with-settings", Histogram.Precision.Fine, 5000L, 1 second) {
+ () ⇒ 1L
+ }
+
+ gaugeA shouldBe theSameInstanceAs(gaugeB)
+ }
+
+ "allow registering a Gauge that takes the default configuration from the kamon.metrics.precision settings" in {
+ Kamon(UserMetrics).registerGauge("gauge-with-default-configuration") {
+ () ⇒ 2L
+ }
+ }
+
+ "generate a snapshot containing all the registered user metrics and reset all instruments" in {
+ val context = CollectionContext.default
+ val userMetricsRecorder = Kamon(Metrics).register(UserMetrics, UserMetrics.Factory).get
+
+ val histogramWithSettings = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L)
+ val histogramWithDefaultConfiguration = Kamon(UserMetrics).registerHistogram("histogram-with-default-configuration")
+ val counter = Kamon(UserMetrics).registerCounter("counter")
+ val minMaxCounterWithSettings = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-with-settings", Histogram.Precision.Normal, 1000L, 1 second)
+ val gauge = Kamon(UserMetrics).registerGauge("gauge-with-default-configuration") { () ⇒ 2L }
+
+ // lets put some values on those metrics
+ histogramWithSettings.record(10)
+ histogramWithSettings.record(20, 100)
+ histogramWithDefaultConfiguration.record(40)
+
+ counter.increment()
+ counter.increment(16)
+
+ minMaxCounterWithSettings.increment(43)
+ minMaxCounterWithSettings.decrement()
+
+ gauge.record(15)
+
+ val firstSnapshot = userMetricsRecorder.collect(context)
+
+ firstSnapshot.histograms.size should be(2)
+ firstSnapshot.histograms.keys should contain allOf (
+ UserHistogram("histogram-with-settings"),
+ UserHistogram("histogram-with-default-configuration"))
+
+ firstSnapshot.histograms(UserHistogram("histogram-with-settings")).min shouldBe (10)
+ firstSnapshot.histograms(UserHistogram("histogram-with-settings")).max shouldBe (20)
+ firstSnapshot.histograms(UserHistogram("histogram-with-settings")).numberOfMeasurements should be(101)
+ firstSnapshot.histograms(UserHistogram("histogram-with-settings")).recordsIterator.toStream should contain allOf (
+ MutableRecord(10, 1),
+ MutableRecord(20, 100))
+
+ firstSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).min shouldBe (40)
+ firstSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).max shouldBe (40)
+ firstSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).numberOfMeasurements should be(1)
+ firstSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).recordsIterator.toStream should contain only (
+ MutableRecord(40, 1))
+
+ firstSnapshot.counters(UserCounter("counter")).count should be(17)
+
+ firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).min shouldBe (0)
+ firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).max shouldBe (43)
+ firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).numberOfMeasurements should be(3)
+ firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).recordsIterator.toStream should contain allOf (
+ MutableRecord(0, 1), // min
+ MutableRecord(42, 1), // current
+ MutableRecord(43, 1)) // max
+
+ firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).min shouldBe (0)
+ firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).max shouldBe (0)
+ firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).numberOfMeasurements should be(3)
+ firstSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).recordsIterator.toStream should contain only (
+ MutableRecord(0, 3)) // min, max and current
+
+ firstSnapshot.gauges(UserGauge("gauge-with-default-configuration")).min shouldBe (15)
+ firstSnapshot.gauges(UserGauge("gauge-with-default-configuration")).max shouldBe (15)
+ firstSnapshot.gauges(UserGauge("gauge-with-default-configuration")).numberOfMeasurements should be(1)
+ firstSnapshot.gauges(UserGauge("gauge-with-default-configuration")).recordsIterator.toStream should contain only (
+ MutableRecord(15, 1)) // only the manually recorded value
+
+ val secondSnapshot = userMetricsRecorder.collect(context)
+
+ secondSnapshot.histograms.size should be(2)
+ secondSnapshot.histograms.keys should contain allOf (
+ UserHistogram("histogram-with-settings"),
+ UserHistogram("histogram-with-default-configuration"))
+
+ secondSnapshot.histograms(UserHistogram("histogram-with-settings")).min shouldBe (0)
+ secondSnapshot.histograms(UserHistogram("histogram-with-settings")).max shouldBe (0)
+ secondSnapshot.histograms(UserHistogram("histogram-with-settings")).numberOfMeasurements should be(0)
+ secondSnapshot.histograms(UserHistogram("histogram-with-settings")).recordsIterator.toStream shouldBe empty
+
+ secondSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).min shouldBe (0)
+ secondSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).max shouldBe (0)
+ secondSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).numberOfMeasurements should be(0)
+ secondSnapshot.histograms(UserHistogram("histogram-with-default-configuration")).recordsIterator.toStream shouldBe empty
+
+ secondSnapshot.counters(UserCounter("counter")).count should be(0)
+
+ secondSnapshot.minMaxCounters.size should be(2)
+ secondSnapshot.minMaxCounters.keys should contain allOf (
+ UserMinMaxCounter("min-max-counter-with-settings"),
+ UserMinMaxCounter("min-max-counter-with-default-configuration"))
+
+ secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).min shouldBe (42)
+ secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).max shouldBe (42)
+ secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).numberOfMeasurements should be(3)
+ secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-settings")).recordsIterator.toStream should contain only (
+ MutableRecord(42, 3)) // min, max and current
+
+ secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).min shouldBe (0)
+ secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).max shouldBe (0)
+ secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).numberOfMeasurements should be(3)
+ secondSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-with-default-configuration")).recordsIterator.toStream should contain only (
+ MutableRecord(0, 3)) // min, max and current
+
+ secondSnapshot.gauges(UserGauge("gauge-with-default-configuration")).min shouldBe (0)
+ secondSnapshot.gauges(UserGauge("gauge-with-default-configuration")).max shouldBe (0)
+ secondSnapshot.gauges(UserGauge("gauge-with-default-configuration")).numberOfMeasurements should be(0)
+ secondSnapshot.gauges(UserGauge("gauge-with-default-configuration")).recordsIterator shouldBe empty
+
+ }
+
+ "generate a snapshot that can be merged with another" in {
+ val context = CollectionContext.default
+ val userMetricsRecorder = Kamon(Metrics).register(UserMetrics, UserMetrics.Factory).get
+
+ val histogram = Kamon(UserMetrics).registerHistogram("histogram-for-merge")
+ val counter = Kamon(UserMetrics).registerCounter("counter-for-merge")
+ val minMaxCounter = Kamon(UserMetrics).registerMinMaxCounter("min-max-counter-for-merge")
+ val gauge = Kamon(UserMetrics).registerGauge("gauge-for-merge") { () ⇒ 10L }
+
+ histogram.record(100)
+ counter.increment(10)
+ minMaxCounter.increment(50)
+ minMaxCounter.decrement(10)
+ gauge.record(50)
+
+ val firstSnapshot = userMetricsRecorder.collect(context)
+
+ val extraCounter = Kamon(UserMetrics).registerCounter("extra-counter")
+ histogram.record(200)
+ extraCounter.increment(20)
+ minMaxCounter.increment(40)
+ minMaxCounter.decrement(50)
+ gauge.record(70)
+
+ val secondSnapshot = userMetricsRecorder.collect(context)
+ val mergedSnapshot = firstSnapshot.merge(secondSnapshot, context)
+
+ mergedSnapshot.histograms.keys should contain(UserHistogram("histogram-for-merge"))
+
+ mergedSnapshot.histograms(UserHistogram("histogram-for-merge")).min shouldBe (100)
+ mergedSnapshot.histograms(UserHistogram("histogram-for-merge")).max shouldBe (200)
+ mergedSnapshot.histograms(UserHistogram("histogram-for-merge")).numberOfMeasurements should be(2)
+ mergedSnapshot.histograms(UserHistogram("histogram-for-merge")).recordsIterator.toStream should contain allOf (
+ MutableRecord(100, 1),
+ MutableRecord(200, 1))
+
+ mergedSnapshot.counters(UserCounter("counter-for-merge")).count should be(10)
+ mergedSnapshot.counters(UserCounter("extra-counter")).count should be(20)
+
+ mergedSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-for-merge")).min shouldBe (0)
+ mergedSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-for-merge")).max shouldBe (80)
+ mergedSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-for-merge")).numberOfMeasurements should be(6)
+ mergedSnapshot.minMaxCounters(UserMinMaxCounter("min-max-counter-for-merge")).recordsIterator.toStream should contain allOf (
+ MutableRecord(0, 1), // min in first snapshot
+ MutableRecord(30, 2), // min and current in second snapshot
+ MutableRecord(40, 1), // current in first snapshot
+ MutableRecord(50, 1), // max in first snapshot
+ MutableRecord(80, 1)) // max in second snapshot
+
+ mergedSnapshot.gauges(UserGauge("gauge-for-merge")).min shouldBe (50)
+ mergedSnapshot.gauges(UserGauge("gauge-for-merge")).max shouldBe (70)
+ mergedSnapshot.gauges(UserGauge("gauge-for-merge")).numberOfMeasurements should be(2)
+ mergedSnapshot.gauges(UserGauge("gauge-for-merge")).recordsIterator.toStream should contain allOf (
+ MutableRecord(50, 1),
+ MutableRecord(70, 1))
+ }
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala
new file mode 100644
index 00000000..1a93e1f6
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metric/instrument/CounterSpec.scala
@@ -0,0 +1,55 @@
+package kamon.metric.instrument
+
+import java.nio.LongBuffer
+
+import kamon.metric.CollectionContext
+import org.scalatest.{ Matchers, WordSpec }
+
+class CounterSpec extends WordSpec with Matchers {
+
+ "a Counter" should {
+ "allow increment only operations" in new CounterFixture {
+ counter.increment()
+ counter.increment(10)
+
+ intercept[UnsupportedOperationException] {
+ counter.increment(-10)
+ }
+ }
+
+ "reset to zero when a snapshot is taken" in new CounterFixture {
+ counter.increment(100)
+ takeSnapshotFrom(counter).count should be(100)
+ takeSnapshotFrom(counter).count should be(0)
+ takeSnapshotFrom(counter).count should be(0)
+
+ counter.increment(50)
+ takeSnapshotFrom(counter).count should be(50)
+ takeSnapshotFrom(counter).count should be(0)
+ }
+
+ "produce a snapshot that can be merged with others" in new CounterFixture {
+ val counterA = Counter()
+ val counterB = Counter()
+ counterA.increment(100)
+ counterB.increment(200)
+
+ val counterASnapshot = takeSnapshotFrom(counterA)
+ val counterBSnapshot = takeSnapshotFrom(counterB)
+
+ counterASnapshot.merge(counterBSnapshot, collectionContext).count should be(300)
+ counterBSnapshot.merge(counterASnapshot, collectionContext).count should be(300)
+ }
+
+ }
+
+ trait CounterFixture {
+ val counter = Counter()
+
+ val collectionContext = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(1)
+ }
+
+ def takeSnapshotFrom(counter: Counter): Counter.Snapshot = counter.collect(collectionContext)
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala
new file mode 100644
index 00000000..b3ff3c9f
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala
@@ -0,0 +1,70 @@
+package kamon.metric.instrument
+
+import java.util.concurrent.atomic.AtomicLong
+
+import akka.actor.ActorSystem
+import com.typesafe.config.ConfigFactory
+import kamon.metric.{ Scale, CollectionContext }
+import org.scalatest.{ Matchers, WordSpecLike }
+import scala.concurrent.duration._
+
+class GaugeSpec extends WordSpecLike with Matchers {
+ val system = ActorSystem("gauge-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics {
+ | flush-interval = 1 hour
+ | precision {
+ | default-gauge-precision {
+ | refresh-interval = 100 milliseconds
+ | highest-trackable-value = 999999999
+ | significant-value-digits = 2
+ | }
+ | }
+ |}
+ """.stripMargin))
+
+ "a Gauge" should {
+ "automatically record the current value using the configured refresh-interval" in {
+ val numberOfValuesRecorded = new AtomicLong(0)
+ val gauge = Gauge.fromDefaultConfig(system) { () ⇒ numberOfValuesRecorded.addAndGet(1) }
+
+ Thread.sleep(1.second.toMillis)
+ numberOfValuesRecorded.get() should be(10L +- 1L)
+ gauge.cleanup
+ }
+
+ "stop automatically recording after a call to cleanup" in {
+ val numberOfValuesRecorded = new AtomicLong(0)
+ val gauge = Gauge.fromDefaultConfig(system) { () ⇒ numberOfValuesRecorded.addAndGet(1) }
+
+ Thread.sleep(1.second.toMillis)
+ gauge.cleanup
+ numberOfValuesRecorded.get() should be(10L +- 1L)
+ Thread.sleep(1.second.toMillis)
+ numberOfValuesRecorded.get() should be(10L +- 1L)
+ }
+
+ "produce a Histogram snapshot including all the recorded values" in {
+ val numberOfValuesRecorded = new AtomicLong(0)
+ val gauge = Gauge.fromDefaultConfig(system) { () ⇒ numberOfValuesRecorded.addAndGet(1) }
+
+ Thread.sleep(1.second.toMillis)
+ gauge.cleanup
+ val snapshot = gauge.collect(CollectionContext.default)
+
+ snapshot.numberOfMeasurements should be(10L +- 1L)
+ snapshot.min should be(1)
+ snapshot.max should be(10L +- 1L)
+ }
+
+ "not record the current value when doing a collection" in {
+ val numberOfValuesRecorded = new AtomicLong(0)
+ val gauge = Gauge(Histogram.Precision.Normal, 10000L, Scale.Unit, 1 hour, system)(() ⇒ numberOfValuesRecorded.addAndGet(1))
+
+ val snapshot = gauge.collect(CollectionContext.default)
+
+ snapshot.numberOfMeasurements should be(0)
+ numberOfValuesRecorded.get() should be(0)
+ }
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala
new file mode 100644
index 00000000..cefdf0f4
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metric/instrument/HistogramSpec.scala
@@ -0,0 +1,130 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import java.nio.LongBuffer
+
+import com.typesafe.config.ConfigFactory
+import kamon.metric.CollectionContext
+import org.scalatest.{ Matchers, WordSpec }
+
+import scala.util.Random
+
+class HistogramSpec extends WordSpec with Matchers {
+
+ val histogramConfig = ConfigFactory.parseString(
+ """
+ |
+ |highest-trackable-value = 100000
+ |significant-value-digits = 2
+ |
+ """.stripMargin)
+
+ "a Histogram" should {
+ "allow record values within the configured range" in new HistogramFixture {
+ histogram.record(1000)
+ histogram.record(5000, count = 100)
+ histogram.record(10000)
+ }
+
+ "fail when recording values higher than the highest trackable value" in new HistogramFixture {
+ intercept[IndexOutOfBoundsException] {
+ histogram.record(1000000)
+ }
+ }
+
+ "reset all recorded levels to zero after a snapshot collection" in new HistogramFixture {
+ histogram.record(100)
+ histogram.record(200)
+ histogram.record(300)
+
+ takeSnapshot().numberOfMeasurements should be(3)
+ takeSnapshot().numberOfMeasurements should be(0)
+ }
+
+ "produce a snapshot" which {
+ "supports min, max and numberOfMeasurements operations" in new HistogramFixture {
+ histogram.record(100)
+ histogram.record(200, count = 200)
+ histogram.record(300)
+ histogram.record(900)
+
+ val snapshot = takeSnapshot()
+
+ snapshot.min should equal(100L +- 1L)
+ snapshot.max should equal(900L +- 9L)
+ snapshot.numberOfMeasurements should be(203)
+ }
+
+ "can be merged with another snapshot" in new MultipleHistogramFixture {
+ val random = new Random(System.nanoTime())
+
+ for (repetitions ← 1 to 1000) {
+ // Put some values on A and Control
+ for (_ ← 1 to 1000) {
+ val newRecording = random.nextInt(100000)
+ controlHistogram.record(newRecording)
+ histogramA.record(newRecording)
+ }
+
+ // Put some values on B and Control
+ for (_ ← 1 to 2000) {
+ val newRecording = random.nextInt(100000)
+ controlHistogram.record(newRecording)
+ histogramB.record(newRecording)
+ }
+
+ val controlSnapshot = takeSnapshotFrom(controlHistogram)
+ val histogramASnapshot = takeSnapshotFrom(histogramA)
+ val histogramBSnapshot = takeSnapshotFrom(histogramB)
+
+ assertEquals(controlSnapshot, histogramASnapshot.merge(histogramBSnapshot, collectionContext))
+ assertEquals(controlSnapshot, histogramBSnapshot.merge(histogramASnapshot, collectionContext))
+ }
+ }
+ }
+ }
+
+ trait HistogramFixture {
+ val collectionContext = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(10000)
+ }
+
+ val histogram = Histogram.fromConfig(histogramConfig)
+
+ def takeSnapshot(): Histogram.Snapshot = histogram.collect(collectionContext)
+ }
+
+ trait MultipleHistogramFixture {
+ val collectionContext = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(10000)
+ }
+
+ val controlHistogram = Histogram.fromConfig(histogramConfig)
+ val histogramA = Histogram.fromConfig(histogramConfig)
+ val histogramB = Histogram.fromConfig(histogramConfig)
+
+ def takeSnapshotFrom(histogram: Histogram): Histogram.Snapshot = histogram.collect(collectionContext)
+
+ def assertEquals(left: Histogram.Snapshot, right: Histogram.Snapshot): Unit = {
+ left.numberOfMeasurements should equal(right.numberOfMeasurements)
+ left.min should equal(right.min)
+ left.max should equal(right.max)
+ left.recordsIterator.toStream should contain theSameElementsAs (right.recordsIterator.toStream)
+ }
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala
new file mode 100644
index 00000000..cb03664c
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala
@@ -0,0 +1,108 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import java.nio.LongBuffer
+
+import akka.actor.ActorSystem
+import com.typesafe.config.ConfigFactory
+import kamon.metric.CollectionContext
+import kamon.metric.instrument.Histogram.MutableRecord
+import org.scalatest.{ Matchers, WordSpecLike }
+
+class MinMaxCounterSpec extends WordSpecLike with Matchers {
+ val system = ActorSystem("min-max-counter-spec")
+ val minMaxCounterConfig = ConfigFactory.parseString(
+ """
+ |refresh-interval = 1 hour
+ |highest-trackable-value = 1000
+ |significant-value-digits = 2
+ """.stripMargin)
+
+ "the MinMaxCounter" should {
+ "track ascending tendencies" in new MinMaxCounterFixture {
+ mmCounter.increment()
+ mmCounter.increment(3)
+ mmCounter.increment()
+
+ val snapshot = collectCounterSnapshot()
+
+ snapshot.min should be(0)
+ snapshot.max should be(5)
+ snapshot.recordsIterator.toStream should contain allOf (
+ MutableRecord(0, 1), // min
+ MutableRecord(5, 2)) // max and current
+ }
+
+ "track descending tendencies" in new MinMaxCounterFixture {
+ mmCounter.increment(5)
+ mmCounter.decrement()
+ mmCounter.decrement(3)
+ mmCounter.decrement()
+
+ val snapshot = collectCounterSnapshot()
+
+ snapshot.min should be(0)
+ snapshot.max should be(5)
+ snapshot.recordsIterator.toStream should contain allOf (
+ MutableRecord(0, 2), // min and current
+ MutableRecord(5, 1)) // max
+ }
+
+ "reset the min and max to the current value after taking a snapshot" in new MinMaxCounterFixture {
+ mmCounter.increment(5)
+ mmCounter.decrement(3)
+
+ val firstSnapshot = collectCounterSnapshot()
+
+ firstSnapshot.min should be(0)
+ firstSnapshot.max should be(5)
+ firstSnapshot.recordsIterator.toStream should contain allOf (
+ MutableRecord(0, 1), // min
+ MutableRecord(2, 1), // current
+ MutableRecord(5, 1)) // max
+
+ val secondSnapshot = collectCounterSnapshot()
+
+ secondSnapshot.min should be(2)
+ secondSnapshot.max should be(2)
+ secondSnapshot.recordsIterator.toStream should contain(
+ MutableRecord(2, 3)) // min, max and current
+ }
+
+ "report zero as the min and current values if they current value fell bellow zero" in new MinMaxCounterFixture {
+ mmCounter.decrement(3)
+
+ val snapshot = collectCounterSnapshot()
+
+ snapshot.min should be(0)
+ snapshot.max should be(0)
+ snapshot.recordsIterator.toStream should contain(
+ MutableRecord(0, 3)) // min, max and current (even while current really is -3
+ }
+ }
+
+ trait MinMaxCounterFixture {
+ val collectionContext = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(64)
+ }
+
+ val mmCounter = MinMaxCounter.fromConfig(minMaxCounterConfig, system).asInstanceOf[PaddedMinMaxCounter]
+ mmCounter.cleanup // cancel the refresh schedule
+
+ def collectCounterSnapshot(): Histogram.Snapshot = mmCounter.collect(collectionContext)
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
deleted file mode 100644
index 645ca96a..00000000
--- a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metrics
-
-import org.scalatest.{ WordSpecLike, Matchers }
-import akka.testkit.{ TestProbe, TestKitBase }
-import akka.actor.{ ActorRef, Actor, Props, ActorSystem }
-import com.typesafe.config.ConfigFactory
-import scala.concurrent.duration._
-import kamon.Kamon
-import kamon.metrics.Subscriptions.TickMetricSnapshot
-import kamon.metrics.ActorMetrics.ActorMetricSnapshot
-import kamon.metrics.MetricSnapshot.Measurement
-
-class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
- implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString(
- """
- |kamon.metrics {
- | filters = [
- | {
- | actor {
- | includes = [ "user/tracked-*" ]
- | excludes = [ "user/tracked-explicitly-excluded"]
- | }
- | }
- | ]
- |}
- """.stripMargin))
-
- "the Kamon actor metrics" should {
- "respect the configured include and exclude filters" in new DelayableActorFixture {
- val tracked = system.actorOf(Props[DelayableActor], "tracked-actor")
- val nonTracked = system.actorOf(Props[DelayableActor], "non-tracked-actor")
- val trackedExplicitlyExcluded = system.actorOf(Props[DelayableActor], "tracked-explicitly-excluded")
-
- Kamon(Metrics).subscribe(ActorMetrics, "*", testActor, permanently = true)
- expectMsgType[TickMetricSnapshot]
-
- tracked ! Discard
- nonTracked ! Discard
- trackedExplicitlyExcluded ! Discard
-
- within(2 seconds) {
- val tickSnapshot = expectMsgType[TickMetricSnapshot]
- tickSnapshot.metrics.keys should contain(ActorMetrics("user/tracked-actor"))
- tickSnapshot.metrics.keys should not contain (ActorMetrics("user/non-tracked-actor"))
- tickSnapshot.metrics.keys should not contain (ActorMetrics("user/tracked-explicitly-excluded"))
- }
- }
-
- "record mailbox-size, processing-time and time-in-mailbox metrics under regular conditions" in new DelayableActorFixture {
- val (delayable, metricsListener) = delayableActor("tracked-normal-conditions")
-
- for (_ ← 1 to 10) {
- delayable ! Discard
- }
-
- val actorMetrics = expectActorMetrics("user/tracked-normal-conditions", metricsListener, 3 seconds)
- actorMetrics.mailboxSize.max should be <= 10L
- actorMetrics.processingTime.numberOfMeasurements should be(10L)
- actorMetrics.timeInMailbox.numberOfMeasurements should be(10L)
- }
-
- "keep a correct mailbox-size even if the actor is blocked processing a message" in new DelayableActorFixture {
- val (delayable, metricsListener) = delayableActor("tracked-mailbox-size-queueing-up")
-
- delayable ! Delay(2500 milliseconds)
- for (_ ← 1 to 9) {
- delayable ! Discard
- }
-
- // let the first snapshot pass
- metricsListener.expectMsgType[TickMetricSnapshot]
-
- // process the tick in which the actor is stalled.
- val stalledTickMetrics = expectActorMetrics("user/tracked-mailbox-size-queueing-up", metricsListener, 2 seconds)
- stalledTickMetrics.mailboxSize.numberOfMeasurements should equal(30)
- // only the automatic last-value recording should be taken, and includes the message being currently processed.
- stalledTickMetrics.mailboxSize.measurements should contain only (Measurement(10, 30))
- stalledTickMetrics.mailboxSize.min should equal(10)
- stalledTickMetrics.mailboxSize.max should equal(10)
- stalledTickMetrics.processingTime.numberOfMeasurements should be(0L)
- stalledTickMetrics.timeInMailbox.numberOfMeasurements should be(0L)
-
- // process the tick after the actor is unblocked.
- val afterStallTickMetrics = expectActorMetrics("user/tracked-mailbox-size-queueing-up", metricsListener, 2 seconds)
- afterStallTickMetrics.processingTime.numberOfMeasurements should be(10L)
- afterStallTickMetrics.timeInMailbox.numberOfMeasurements should be(10L)
- afterStallTickMetrics.processingTime.max should be(2500.milliseconds.toNanos +- 100.milliseconds.toNanos)
- afterStallTickMetrics.timeInMailbox.max should be(2500.milliseconds.toNanos +- 100.milliseconds.toNanos)
- }
-
- "track the number of errors" in new ErrorActorFixture {
- val (error, metricsListener) = failedActor("tracked-errors")
-
- for (_ ← 1 to 5) {
- error ! Error
- }
-
- val actorMetrics = expectActorMetrics("user/tracked-errors", metricsListener, 3 seconds)
- actorMetrics.errorCounter.numberOfMeasurements should be(5L)
- }
- }
-
- def expectActorMetrics(actorPath: String, listener: TestProbe, waitTime: FiniteDuration): ActorMetricSnapshot = {
- val tickSnapshot = within(waitTime) {
- listener.expectMsgType[TickMetricSnapshot]
- }
- val actorMetricsOption = tickSnapshot.metrics.get(ActorMetrics(actorPath))
- actorMetricsOption should not be empty
- actorMetricsOption.get.asInstanceOf[ActorMetricSnapshot]
- }
-
- trait DelayableActorFixture {
- def delayableActor(name: String): (ActorRef, TestProbe) = {
- val actor = system.actorOf(Props[DelayableActor], name)
- val metricsListener = TestProbe()
-
- Kamon(Metrics).subscribe(ActorMetrics, "user/" + name, metricsListener.ref, permanently = true)
- // Wait for one empty snapshot before proceeding to the test.
- metricsListener.expectMsgType[TickMetricSnapshot]
-
- (actor, metricsListener)
- }
- }
-
- trait ErrorActorFixture {
- def failedActor(name: String): (ActorRef, TestProbe) = {
- val actor = system.actorOf(Props[FailedActor], name)
- val metricsListener = TestProbe()
-
- Kamon(Metrics).subscribe(ActorMetrics, "user/" + name, metricsListener.ref, permanently = true)
- // Wait for one empty snapshot before proceeding to the test.
- metricsListener.expectMsgType[TickMetricSnapshot]
-
- (actor, metricsListener)
- }
- }
-}
-
-class DelayableActor extends Actor {
- def receive = {
- case Delay(time) ⇒ Thread.sleep(time.toMillis)
- case Discard ⇒
- }
-}
-
-class FailedActor extends Actor {
- def receive = {
- case Error ⇒ 1 / 0
- case Discard ⇒
- }
-}
-
-case object Discard
-
-case class Delay(time: FiniteDuration)
-
-case class Error()
diff --git a/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala b/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala
deleted file mode 100644
index 1e072f71..00000000
--- a/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metrics
-
-import akka.testkit.TestKitBase
-import org.scalatest.{ Matchers, WordSpecLike }
-import akka.actor.ActorSystem
-import scala.concurrent.duration._
-import com.typesafe.config.ConfigFactory
-import kamon.Kamon
-import kamon.metrics.Subscriptions.TickMetricSnapshot
-import kamon.metrics.MetricSnapshot.Measurement
-
-class CustomMetricSpec extends TestKitBase with WordSpecLike with Matchers {
- implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString(
- """
- |kamon.metrics {
- | filters = [
- | {
- | custom-metric {
- | includes = [ "test/*" ]
- | excludes = [ ]
- | }
- | }
- | ]
- |}
- """.stripMargin))
-
- "the Kamon custom metrics support" should {
- "allow registering a custom metric with the Metrics extension" in {
- val recorder = Kamon(Metrics).register(CustomMetric("test/sample-counter"), CustomMetric.histogram(100, 2, Scale.Unit))
-
- recorder should be('defined)
- }
-
- "allow subscriptions to custom metrics using the default subscription protocol" in {
- val recorder = Kamon(Metrics).register(CustomMetric("test/sample-counter"), CustomMetric.histogram(100, 2, Scale.Unit))
-
- recorder.map { r ⇒
- r.record(100)
- r.record(15)
- r.record(0)
- r.record(50)
- }
-
- Kamon(Metrics).subscribe(CustomMetric, "test/sample-counter", testActor)
-
- val recordedValues = within(5 seconds) {
- val snapshot = expectMsgType[TickMetricSnapshot]
- snapshot.metrics(CustomMetric("test/sample-counter")).metrics(CustomMetric.RecordedValues)
- }
-
- recordedValues.min should equal(0)
- recordedValues.max should equal(100)
- recordedValues.numberOfMeasurements should equal(4)
- recordedValues.measurements should contain allOf (
- Measurement(0, 1),
- Measurement(15, 1),
- Measurement(50, 1),
- Measurement(100, 1))
- }
- }
-
-}
diff --git a/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala b/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala
deleted file mode 100644
index 4d6ebc49..00000000
--- a/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metrics
-
-import org.scalatest.{ Matchers, WordSpec }
-import kamon.metrics.MetricSnapshot.Measurement
-
-class MetricSnapshotSpec extends WordSpec with Matchers {
-
- "a metric snapshot" should {
- "support a max operation" in new SnapshotFixtures {
- snapshotA.max should be(17)
- snapshotB.max should be(10)
- snapshotC.max should be(1)
- }
-
- "support a min operation" in new SnapshotFixtures {
- snapshotA.min should be(1)
- snapshotB.min should be(2)
- snapshotC.min should be(1)
- }
-
- "be able to merge with other snapshot" in new SnapshotFixtures {
- val merged = snapshotA.merge(snapshotB).merge(snapshotC)
-
- merged.min should be(1)
- merged.max should be(17)
- merged.numberOfMeasurements should be(300)
- merged.measurements.map(_.value) should contain inOrderOnly (1, 2, 4, 5, 7, 10, 17)
- }
-
- "be able to merge with empty snapshots" in new SnapshotFixtures {
- snapshotA.merge(emptySnapshot) should be(snapshotA)
- emptySnapshot.merge(snapshotA).merge(emptySnapshot) should be(snapshotA)
- snapshotC.merge(emptySnapshot) should be(snapshotC)
- }
-
- }
-
- trait SnapshotFixtures {
- val emptySnapshot = MetricSnapshot(InstrumentTypes.Histogram, 0, Scale.Unit, Vector.empty)
-
- val snapshotA = MetricSnapshot(InstrumentTypes.Histogram, 100, Scale.Unit, Vector(
- Measurement(1, 3),
- Measurement(2, 15),
- Measurement(5, 68),
- Measurement(7, 13),
- Measurement(17, 1)))
-
- val snapshotB = MetricSnapshot(InstrumentTypes.Histogram, 100, Scale.Unit, Vector(
- Measurement(2, 6),
- Measurement(4, 48),
- Measurement(5, 39),
- Measurement(10, 7)))
-
- val snapshotC = MetricSnapshot(InstrumentTypes.Counter, 100, Scale.Unit, Vector(Measurement(1, 100)))
- }
-}
diff --git a/kamon-core/src/test/scala/kamon/metrics/instrument/MinMaxCounterSpec.scala b/kamon-core/src/test/scala/kamon/metrics/instrument/MinMaxCounterSpec.scala
deleted file mode 100644
index 14f1573f..00000000
--- a/kamon-core/src/test/scala/kamon/metrics/instrument/MinMaxCounterSpec.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-package kamon.metrics.instrument
-
-import org.scalatest.{ Matchers, WordSpecLike }
-import kamon.metrics.instruments.MinMaxCounter
-import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement
-
-class MinMaxCounterSpec extends WordSpecLike with Matchers {
-
- "the MinMaxCounter" should {
- "increment" in {
- val counter = MinMaxCounter()
-
- counter.increment()
- counter.increment()
- counter.increment()
- counter.increment()
- counter.increment()
-
- val CounterMeasurement(_, _, current) = counter.collect()
-
- current should be(5)
- }
-
- "decrement" in {
- val counter = MinMaxCounter()
- counter.increment(5L)
-
- counter.decrement()
- counter.decrement()
- counter.decrement()
- counter.decrement()
- counter.decrement()
-
- val CounterMeasurement(_, _, current) = counter.collect()
-
- current should be(0)
- }
-
- "reset the min and max with the sum value when the collect method is called" in {
- val counter = MinMaxCounter()
-
- counter.increment(10)
- counter.increment(20)
- counter.increment(30)
- counter.increment(40)
- counter.increment(50)
-
- counter.collect() //only for check the last value after reset min max
-
- val CounterMeasurement(min, max, current) = counter.collect()
-
- min should be(current)
- max should be(current)
- current should be(150)
- }
- }
-
- "track the min value" in {
- val counter = MinMaxCounter()
-
- counter.increment(10)
- counter.increment(20)
- counter.increment(30)
- counter.increment(40)
- counter.increment(50)
-
- val CounterMeasurement(min, _, _) = counter.collect()
-
- min should be(0)
-
- counter.increment(50)
-
- val CounterMeasurement(minAfterCollectAndAddSomeValues, _, _) = counter.collect()
-
- minAfterCollectAndAddSomeValues should be(150)
- }
-
- "track the max value" in {
- val counter = MinMaxCounter()
- counter.increment(10)
- counter.increment(20)
- counter.increment(30)
- counter.increment(40)
- counter.increment(50)
-
- val CounterMeasurement(_, max, _) = counter.collect()
-
- max should be(150)
-
- counter.increment(200)
-
- val CounterMeasurement(_, maxAfterCollectAndAddSomeValues, _) = counter.collect()
-
- maxAfterCollectAndAddSomeValues should be(350)
- }
-}
diff --git a/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala
new file mode 100644
index 00000000..4d0049f1
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/TraceContextManipulationSpec.scala
@@ -0,0 +1,95 @@
+package kamon.trace
+
+import akka.actor.ActorSystem
+import akka.testkit.{ ImplicitSender, TestKitBase }
+import com.typesafe.config.ConfigFactory
+import kamon.trace.TraceContext.SegmentIdentity
+import org.scalatest.{ Matchers, WordSpecLike }
+
+class TraceContextManipulationSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
+ implicit lazy val system: ActorSystem = ActorSystem("trace-metrics-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics {
+ | tick-interval = 1 hour
+ | filters = [
+ | {
+ | trace {
+ | includes = [ "*" ]
+ | excludes = [ "non-tracked-trace"]
+ | }
+ | }
+ | ]
+ | precision {
+ | default-histogram-precision {
+ | highest-trackable-value = 3600000000000
+ | significant-value-digits = 2
+ | }
+ |
+ | default-min-max-counter-precision {
+ | refresh-interval = 1 second
+ | highest-trackable-value = 999999999
+ | significant-value-digits = 2
+ | }
+ | }
+ |}
+ """.stripMargin))
+
+ "the TraceRecorder api" should {
+ "allow starting a trace within a specified block of code, and only within that block of code" in {
+ val createdContext = TraceRecorder.withNewTraceContext("start-context") {
+ TraceRecorder.currentContext should not be empty
+ TraceRecorder.currentContext.get
+ }
+
+ TraceRecorder.currentContext shouldBe empty
+ createdContext.name shouldBe ("start-context")
+ }
+
+ "allow starting a trace within a specified block of code, providing a trace-token and only within that block of code" in {
+ val createdContext = TraceRecorder.withNewTraceContext("start-context-with-token", Some("token-1")) {
+ TraceRecorder.currentContext should not be empty
+ TraceRecorder.currentContext.get
+ }
+
+ TraceRecorder.currentContext shouldBe empty
+ createdContext.name shouldBe ("start-context-with-token")
+ createdContext.token should be("token-1")
+ }
+
+ "allow providing a TraceContext and make it available within a block of code" in {
+ val createdContext = TraceRecorder.withNewTraceContext("manually-provided-trace-context") { TraceRecorder.currentContext }
+
+ TraceRecorder.currentContext shouldBe empty
+ TraceRecorder.withTraceContext(createdContext) {
+ TraceRecorder.currentContext should be(createdContext)
+ }
+
+ TraceRecorder.currentContext shouldBe empty
+ }
+
+ "allow renaming a trace" in {
+ val createdContext = TraceRecorder.withNewTraceContext("trace-before-rename") {
+ TraceRecorder.rename("renamed-trace")
+ TraceRecorder.currentContext.get
+ }
+
+ TraceRecorder.currentContext shouldBe empty
+ createdContext.name shouldBe ("renamed-trace")
+ }
+
+ "allow creating a segment within a trace" in {
+ val createdContext = TraceRecorder.withNewTraceContext("trace-with-segments") {
+ val segmentHandle = TraceRecorder.startSegment(TraceManipulationTestSegment("segment-1"))
+
+ TraceRecorder.currentContext.get
+ }
+
+ TraceRecorder.currentContext shouldBe empty
+ createdContext.name shouldBe ("trace-with-segments")
+
+ }
+ }
+
+ case class TraceManipulationTestSegment(name: String) extends SegmentIdentity
+
+}
diff --git a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
index 15d5d3fe..b4358ce7 100644
--- a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
+++ b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
@@ -18,7 +18,7 @@ package kamon.datadog
import akka.actor._
import kamon.Kamon
-import kamon.metrics._
+import kamon.metric._
import scala.concurrent.duration._
import scala.collection.JavaConverters._
import com.typesafe.config.Config
diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
index 028e9608..17e19d0b 100644
--- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
+++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
@@ -20,11 +20,10 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
-import kamon.metrics.Subscriptions.TickMetricSnapshot
-import kamon.metrics.MetricSnapshot.Measurement
-import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType }
+import kamon.metric.Subscriptions.TickMetricSnapshot
import java.text.{ DecimalFormatSymbols, DecimalFormat }
-import kamon.metrics.{ MetricIdentity, MetricGroupIdentity }
+import kamon.metric.instrument.{ Counter, Histogram }
+import kamon.metric.{ MetricIdentity, MetricGroupIdentity }
import java.util.Locale
class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider {
@@ -50,7 +49,7 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long
}
def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
- val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote)
+ val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote)
for {
(groupIdentity, groupSnapshot) ← tick.metrics
@@ -59,33 +58,35 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long
val key = buildMetricName(groupIdentity, metricIdentity)
- for (measurement ← metricSnapshot.measurements) {
- val measurementData = formatMeasurement(groupIdentity, metricIdentity, measurement, metricSnapshot.instrumentType)
- dataBuilder.appendMeasurement(key, measurementData)
+ metricSnapshot match {
+ case hs: Histogram.Snapshot ⇒
+ hs.recordsIterator.foreach { record ⇒
+ val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeStatsDTimer(record.level, record.count))
+ packetBuilder.appendMeasurement(key, measurementData)
+
+ }
+
+ case cs: Counter.Snapshot ⇒
+ val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeStatsDCounter(cs.count))
+ packetBuilder.appendMeasurement(key, measurementData)
}
}
- dataBuilder.flush()
+ packetBuilder.flush()
}
- def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurement: Measurement,
- instrumentType: InstrumentType): String = {
-
- StringBuilder.newBuilder.append(buildMeasurementData(measurement, instrumentType))
+ def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurementData: String): String =
+ StringBuilder.newBuilder
+ .append(measurementData)
.append(buildIdentificationTag(groupIdentity, metricIdentity))
.result()
- }
- def buildMeasurementData(measurement: Measurement, instrumentType: InstrumentType): String = {
- def dataDogDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String =
- s"$value|$metricType${(if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")}"
-
- instrumentType match {
- case Histogram ⇒ dataDogDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count))
- case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g")
- case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c")
- }
+ def encodeStatsDTimer(level: Long, count: Long): String = {
+ val samplingRate: Double = 1D / count
+ level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
}
+ def encodeStatsDCounter(count: Long): String = count.toString + "|c"
+
def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String =
s"$appName.${groupIdentity.category.name}.${metricIdentity.name}"
diff --git a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
index 6a7191a1..cb82c362 100644
--- a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
+++ b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
@@ -16,14 +16,14 @@
package kamon.datadog
-import akka.testkit.{TestKitBase, TestProbe}
-import akka.actor.{Props, ActorRef, ActorSystem}
-import kamon.metrics.instruments.CounterRecorder
-import org.scalatest.{Matchers, WordSpecLike}
-import kamon.metrics._
+import akka.testkit.{ TestKitBase, TestProbe }
+import akka.actor.{ Props, ActorRef, ActorSystem }
+import kamon.metric.instrument.Histogram.Precision
+import kamon.metric.instrument.{ Counter, Histogram, HdrHistogram, LongAdderCounter }
+import org.scalatest.{ Matchers, WordSpecLike }
+import kamon.metric._
import akka.io.Udp
-import org.HdrHistogram.HdrRecorder
-import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metric.Subscriptions.TickMetricSnapshot
import java.lang.management.ManagementFactory
import java.net.InetSocketAddress
import com.typesafe.config.ConfigFactory
@@ -32,13 +32,15 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
implicit lazy val system = ActorSystem("datadog-metric-sender-spec",
ConfigFactory.parseString("kamon.datadog.max-packet-size = 256 bytes"))
+ val context = CollectionContext.default
+
"the DataDogMetricSender" should {
"send latency measurements" in new UdpListenerFixture {
val testMetricName = "processing-time"
- val testRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
testRecorder.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"kamon.actor.processing-time:10|ms|#actor:user/kamon")
@@ -46,11 +48,11 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
"include the sampling rate in case of multiple measurements of the same value" in new UdpListenerFixture {
val testMetricName = "processing-time"
- val testRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
testRecorder.record(10L)
testRecorder.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"kamon.actor.processing-time:10|ms|@0.5|#actor:user/kamon")
@@ -58,7 +60,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
"flush the packet when the max-packet-size is reached" in new UdpListenerFixture {
val testMetricName = "processing-time"
- val testRecorder = HdrRecorder(testMaxPacketSize, 3, Scale.Unit)
+ val testRecorder = Histogram(10000L, Precision.Normal, Scale.Unit)
var bytes = 0
var level = 0
@@ -69,8 +71,8 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
bytes += s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon".length
}
- val udp = setup(Map(testMetricName -> testRecorder.collect()))
- udp.expectMsgType[Udp.Send]// let the first flush pass
+ val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
+ udp.expectMsgType[Udp.Send] // let the first flush pass
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon")
@@ -81,24 +83,21 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
val secondTestMetricName = "processing-time-2"
val thirdTestMetricName = "counter"
- val firstTestRecorder = HdrRecorder(1000L, 2, Scale.Unit)
- val secondTestRecorder = HdrRecorder(1000L, 2, Scale.Unit)
- val thirdTestRecorder = CounterRecorder()
+ val firstTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
+ val secondTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
+ val thirdTestRecorder = Counter()
firstTestRecorder.record(10L)
firstTestRecorder.record(10L)
secondTestRecorder.record(21L)
- thirdTestRecorder.record(1L)
- thirdTestRecorder.record(1L)
- thirdTestRecorder.record(1L)
- thirdTestRecorder.record(1L)
+ thirdTestRecorder.increment(4L)
val udp = setup(Map(
- firstTestMetricName -> firstTestRecorder.collect(),
- secondTestMetricName -> secondTestRecorder.collect(),
- thirdTestMetricName -> thirdTestRecorder.collect()))
+ firstTestMetricName -> firstTestRecorder.collect(context),
+ secondTestMetricName -> secondTestRecorder.collect(context),
+ thirdTestMetricName -> thirdTestRecorder.collect(context)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be("kamon.actor.processing-time-1:10|ms|@0.5|#actor:user/kamon\nkamon.actor.processing-time-2:21|ms|#actor:user/kamon\nkamon.actor.counter:4|c|#actor:user/kamon")
@@ -109,7 +108,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1)
val testMaxPacketSize = system.settings.config.getBytes("kamon.datadog.max-packet-size")
- def setup(metrics: Map[String, MetricSnapshotLike]): TestProbe = {
+ def setup(metrics: Map[String, MetricSnapshot]): TestProbe = {
val udp = TestProbe()
val metricsSender = system.actorOf(Props(new DatadogMetricsSender(new InetSocketAddress(localhostName, 0), testMaxPacketSize) {
override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref
@@ -137,7 +136,10 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
}
metricsSender ! TickMetricSnapshot(0, 0, Map(testGroupIdentity -> new MetricGroupSnapshot {
- val metrics: Map[MetricIdentity, MetricSnapshotLike] = testMetrics.toMap
+ type GroupSnapshotType = Histogram.Snapshot
+ def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType = ???
+
+ val metrics: Map[MetricIdentity, MetricSnapshot] = testMetrics.toMap
}))
udp
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala
index 08e0add3..44d9c605 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetrics.scala
@@ -17,16 +17,16 @@
package kamon.newrelic
import akka.actor.Actor
-import kamon.metrics._
+import kamon.metric._
trait CustomMetrics {
self: Actor ⇒
def collectCustomMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Seq[NewRelic.Metric] = {
metrics.collect {
- case (CustomMetric(name), groupSnapshot) ⇒
+ case (UserMetrics, groupSnapshot) ⇒
groupSnapshot.metrics collect {
- case (_, snapshot) ⇒ toNewRelicMetric(Scale.Unit)(s"Custom/$name", None, snapshot)
+ case (name, snapshot) ⇒ toNewRelicMetric(Scale.Unit)(s"Custom/$name", None, snapshot)
}
}.flatten.toSeq
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala
index 46e22571..a3bb6311 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala
@@ -17,7 +17,7 @@
package kamon.newrelic
import akka.actor.{ Props, ActorRef, Actor }
-import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metric.Subscriptions.TickMetricSnapshot
import kamon.newrelic.MetricTranslator.TimeSliceMetrics
class MetricTranslator(receiver: ActorRef) extends Actor
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
index c195ed12..85861454 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
@@ -18,8 +18,8 @@ package kamon.newrelic
import akka.actor._
import scala.concurrent.duration._
import kamon.Kamon
-import kamon.metrics.{ CustomMetric, TickMetricSnapshotBuffer, TraceMetrics, Metrics }
-import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metric.{ UserMetrics, TickMetricSnapshotBuffer, TraceMetrics, Metrics }
+import kamon.metric.Subscriptions.TickMetricSnapshot
import akka.actor
import java.util.concurrent.TimeUnit.MILLISECONDS
@@ -30,7 +30,7 @@ class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val apdexT: Double = config.getDuration("apdexT", MILLISECONDS) / 1E3 // scale to seconds.
Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(CustomMetric, "*", metricsListener, permanently = true)
+ //Kamon(Metrics)(system).subscribe(UserMetrics, "*", metricsListener, permanently = true)
}
class NewRelicMetricsListener extends Actor with ActorLogging {
@@ -50,7 +50,7 @@ object NewRelic extends ExtensionId[NewRelicExtension] with ExtensionIdProvider
def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system)
case class Metric(name: String, scope: Option[String], callCount: Long, total: Double, totalExclusive: Double,
- min: Double, max: Double, sumOfSquares: Double) {
+ min: Double, max: Double, sumOfSquares: Double) {
def merge(that: Metric): Metric = {
Metric(name, scope,
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala
index 90f1e8a5..38517e10 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala
@@ -16,8 +16,8 @@
package kamon.newrelic
-import kamon.metrics._
-import kamon.metrics.TraceMetrics.ElapsedTime
+import kamon.metric._
+import kamon.metric.TraceMetrics.ElapsedTime
import akka.actor.Actor
import kamon.Kamon
@@ -27,6 +27,7 @@ trait WebTransactionMetrics {
def collectWebTransactionMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Seq[NewRelic.Metric] = {
val apdexBuilder = new ApdexBuilder("Apdex", None, Kamon(NewRelic)(context.system).apdexT)
+ /*
// Trace metrics are recorded in nanoseconds.
var accumulatedHttpDispatcher: MetricSnapshotLike = MetricSnapshot(InstrumentTypes.Histogram, 0, Scale.Nano, Vector.empty)
@@ -46,7 +47,9 @@ trait WebTransactionMetrics {
val httpDispatcher = toNewRelicMetric(Scale.Unit)("HttpDispatcher", None, accumulatedHttpDispatcher)
val webTransaction = toNewRelicMetric(Scale.Unit)("WebTransaction", None, accumulatedHttpDispatcher)
- Seq(httpDispatcher, webTransaction, apdexBuilder.build) ++ webTransactionMetrics.flatten.toSeq
+ Seq(httpDispatcher, webTransaction, apdexBuilder.build) ++ webTransactionMetrics.flatten.toSeq */
+
+ ???
}
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala
index f6e377c7..89a8b15b 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala
@@ -16,26 +16,30 @@
package kamon
-import kamon.metrics.{ Scale, MetricSnapshotLike }
+import kamon.metric.instrument.{ Counter, Histogram }
+import kamon.metric.{ MetricSnapshot, Scale }
package object newrelic {
- def toNewRelicMetric(scale: Scale)(name: String, scope: Option[String], snapshot: MetricSnapshotLike): NewRelic.Metric = {
- var total: Double = 0D
- var sumOfSquares: Double = 0D
+ def toNewRelicMetric(scale: Scale)(name: String, scope: Option[String], snapshot: MetricSnapshot): NewRelic.Metric = {
+ snapshot match {
+ case hs: Histogram.Snapshot ⇒
+ var total: Double = 0D
+ var sumOfSquares: Double = 0D
+ val scaledMin = Scale.convert(hs.scale, scale, hs.min)
+ val scaledMax = Scale.convert(hs.scale, scale, hs.max)
- val measurementLevels = snapshot.measurements.iterator
- while (measurementLevels.hasNext) {
- val level = measurementLevels.next()
- val scaledValue = Scale.convert(snapshot.scale, scale, level.value)
+ hs.recordsIterator.foreach { record ⇒
+ val scaledValue = Scale.convert(hs.scale, scale, record.level)
- total += scaledValue * level.count
- sumOfSquares += (scaledValue * scaledValue) * level.count
- }
+ total += scaledValue * record.count
+ sumOfSquares += (scaledValue * scaledValue) * record.count
+ }
- val scaledMin = Scale.convert(snapshot.scale, scale, snapshot.min)
- val scaledMax = Scale.convert(snapshot.scale, scale, snapshot.max)
+ NewRelic.Metric(name, scope, hs.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares)
- NewRelic.Metric(name, scope, snapshot.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares)
+ case cs: Counter.Snapshot ⇒
+ NewRelic.Metric(name, scope, cs.count, cs.count, cs.count, 0, cs.count, cs.count * cs.count)
+ }
}
}
diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
index 0951d2c9..2862ba19 100644
--- a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
+++ b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
@@ -19,7 +19,7 @@ package kamon.play.instrumentation
import org.aspectj.lang.annotation.{ Around, Pointcut, Aspect }
import org.aspectj.lang.ProceedingJoinPoint
import kamon.trace.TraceRecorder
-import kamon.metrics.TraceMetrics.HttpClientRequest
+import kamon.metric.TraceMetrics.HttpClientRequest
import play.api.libs.ws.WSRequest
import scala.concurrent.Future
import play.api.libs.ws.WSResponse
@@ -36,7 +36,7 @@ class WSInstrumentation {
def aroundExecuteRequest(pjp: ProceedingJoinPoint, request: WSRequest): Any = {
import WSInstrumentation._
- val completionHandle = TraceRecorder.startSegment(HttpClientRequest(request.url, UserTime), basicRequestAttributes(request))
+ val completionHandle = TraceRecorder.startSegment(HttpClientRequest(request.url), basicRequestAttributes(request))
val response = pjp.proceed().asInstanceOf[Future[WSResponse]]
@@ -50,7 +50,6 @@ class WSInstrumentation {
}
object WSInstrumentation {
- val UserTime = "UserTime"
def basicRequestAttributes(request: WSRequest): Map[String, String] = {
Map[String, String](
diff --git a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
index 0c3783bb..775d3e26 100644
--- a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
+++ b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
@@ -28,9 +28,9 @@ import akka.testkit.{ TestKitBase, TestProbe }
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, WordSpecLike }
import kamon.Kamon
-import kamon.metrics.{ TraceMetrics, Metrics }
-import kamon.metrics.Subscriptions.TickMetricSnapshot
-import kamon.metrics.TraceMetrics.ElapsedTime
+import kamon.metric.{ TraceMetrics, Metrics }
+import kamon.metric.Subscriptions.TickMetricSnapshot
+import kamon.metric.TraceMetrics.ElapsedTime
class WSInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with OneServerPerSuite {
diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
index 664bd4f9..84621927 100644
--- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
+++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
@@ -26,9 +26,9 @@ import scala.util.Random
import akka.routing.RoundRobinPool
import kamon.trace.TraceRecorder
import kamon.Kamon
-import kamon.metrics._
+import kamon.metric._
import spray.http.{ StatusCodes, Uri }
-import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metric.Subscriptions.TickMetricSnapshot
object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with KamonTraceDirectives {
import scala.concurrent.duration._
@@ -55,8 +55,6 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
val replier = system.actorOf(Props[Replier].withRouter(RoundRobinPool(nrOfInstances = 2)), "replier")
val random = new Random()
- val requestCountRecorder = Kamon(Metrics).register(CustomMetric("GetCount"), CustomMetric.histogram(10, 3, Scale.Unit))
-
startServer(interface = "localhost", port = 9090) {
get {
path("test") {
@@ -87,7 +85,6 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
path("ok") {
traceName("OK") {
complete {
- requestCountRecorder.map(_.record(1))
"ok"
}
}
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
index d7d9cf09..d787bda4 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
@@ -21,7 +21,7 @@ import org.aspectj.lang.ProceedingJoinPoint
import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest }
import spray.http.HttpHeaders.{ RawHeader, Host }
import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware }
-import kamon.metrics.TraceMetrics.HttpClientRequest
+import kamon.metric.TraceMetrics.HttpClientRequest
import kamon.Kamon
import kamon.spray.{ ClientSegmentCollectionStrategy, Spray }
import akka.actor.ActorRef
@@ -30,7 +30,6 @@ import akka.util.Timeout
@Aspect
class ClientRequestInstrumentation {
- import ClientRequestInstrumentation._
@DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
def mixin: SegmentCompletionHandleAware = SegmentCompletionHandleAware.default
@@ -51,7 +50,7 @@ class ClientRequestInstrumentation {
if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
val requestAttributes = basicRequestAttributes(request)
val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, SprayTime), requestAttributes)
+ val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes)
ctx.segmentCompletionHandle = Some(completionHandle)
}
@@ -102,7 +101,7 @@ class ClientRequestInstrumentation {
if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) {
val requestAttributes = basicRequestAttributes(request)
val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, UserTime), requestAttributes)
+ val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes)
responseFuture.onComplete { result ⇒
completionHandle.finish(Map.empty)
@@ -139,8 +138,3 @@ class ClientRequestInstrumentation {
pjp.proceed(Array(modifiedHeaders))
}
}
-
-object ClientRequestInstrumentation {
- val SprayTime = "SprayTime"
- val UserTime = "UserTime"
-}
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
index 9469924a..54329645 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -26,13 +26,12 @@ import com.typesafe.config.ConfigFactory
import spray.can.Http
import spray.http.HttpHeaders.RawHeader
import kamon.Kamon
-import kamon.metrics.{ TraceMetrics, Metrics }
+import kamon.metric.{ TraceMetrics, Metrics }
import spray.client.pipelining
-import kamon.metrics.Subscriptions.TickMetricSnapshot
-import spray.can.client.ClientRequestInstrumentation
+import kamon.metric.Subscriptions.TickMetricSnapshot
import scala.concurrent.duration._
import akka.pattern.pipe
-import kamon.metrics.TraceMetrics.TraceMetricSnapshot
+import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot }
class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer {
implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString(
@@ -149,7 +148,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds)
traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
traceMetrics.segments should not be empty
- val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.tag == ClientRequestInstrumentation.UserTime } map (_._2)
+ val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2)
recordedSegment should not be empty
recordedSegment map { segmentMetrics ⇒
segmentMetrics.numberOfMeasurements should be(1L)
@@ -190,7 +189,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds)
traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
traceMetrics.segments should not be empty
- val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.tag == ClientRequestInstrumentation.SprayTime } map (_._2)
+ val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2)
recordedSegment should not be empty
recordedSegment map { segmentMetrics ⇒
segmentMetrics.numberOfMeasurements should be(1L)
@@ -199,14 +198,14 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
}
}
- def expectTraceMetrics(traceName: String, listener: TestProbe, timeout: FiniteDuration): TraceMetricSnapshot = {
+ def expectTraceMetrics(traceName: String, listener: TestProbe, timeout: FiniteDuration): TraceMetricsSnapshot = {
val tickSnapshot = within(timeout) {
listener.expectMsgType[TickMetricSnapshot]
}
val metricsOption = tickSnapshot.metrics.get(TraceMetrics(traceName))
metricsOption should not be empty
- metricsOption.get.asInstanceOf[TraceMetricSnapshot]
+ metricsOption.get.asInstanceOf[TraceMetricsSnapshot]
}
def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal)
diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
index 7edbbe11..ab9116fd 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
@@ -24,10 +24,11 @@ import kamon.Kamon
import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures }
import spray.http.HttpHeaders.RawHeader
import spray.http.{ HttpResponse, HttpRequest }
-import kamon.metrics.{ TraceMetrics, Metrics }
-import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metric.{ TraceMetrics, Metrics }
+import kamon.metric.Subscriptions.TickMetricSnapshot
import com.typesafe.config.ConfigFactory
-import kamon.metrics.TraceMetrics.ElapsedTime
+import kamon.metric.TraceMetrics.ElapsedTime
+import kamon.metric.instrument.Histogram
class ServerRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding
with ScalaFutures with PatienceConfiguration with TestServer {
@@ -122,7 +123,7 @@ class ServerRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
traceMetrics should not be empty
traceMetrics map { metrics ⇒
- metrics(ElapsedTime).numberOfMeasurements should be(1L)
+ metrics(ElapsedTime).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L)
}
}
diff --git a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala
index 81242133..65506770 100644
--- a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala
@@ -29,11 +29,9 @@ trait TestServer {
def buildClientConnectionAndServer: (ActorRef, TestProbe) = {
val serverHandler = TestProbe()
IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref)
- val bound = within(10 seconds) {
- serverHandler.expectMsgType[Bound]
- }
-
+ val bound = serverHandler.expectMsgType[Bound](10 seconds)
val client = clientConnection(bound)
+
serverHandler.expectMsgType[Http.Connected]
serverHandler.reply(Http.Register(serverHandler.ref))
@@ -50,10 +48,7 @@ trait TestServer {
def buildSHostConnectorAndServer: (ActorRef, TestProbe) = {
val serverHandler = TestProbe()
IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref)
- val bound = within(10 seconds) {
- serverHandler.expectMsgType[Bound]
- }
-
+ val bound = serverHandler.expectMsgType[Bound](10 seconds)
val client = httpHostConnector(bound)
(client, serverHandler)
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
index 1b3daa97..dcd78f78 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
@@ -18,7 +18,7 @@ package kamon.statsd
import akka.actor._
import kamon.Kamon
-import kamon.metrics._
+import kamon.metric._
import scala.concurrent.duration._
import scala.collection.JavaConverters._
import com.typesafe.config.Config
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
index adda18cc..94bab27c 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
@@ -20,12 +20,12 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
-import kamon.metrics.Subscriptions.TickMetricSnapshot
-import kamon.metrics.MetricSnapshot.Measurement
-import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType }
+import kamon.metric.Subscriptions.TickMetricSnapshot
import java.text.{ DecimalFormatSymbols, DecimalFormat }
import java.util.Locale
+import kamon.metric.instrument.{ Counter, Histogram }
+
class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider {
import context.system
@@ -48,7 +48,7 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long)
}
def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
- val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote)
+ val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote)
for (
(groupIdentity, groupSnapshot) ← tick.metrics;
@@ -57,25 +57,26 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long)
val key = metricKeyGenerator.generateKey(groupIdentity, metricIdentity)
- for (measurement ← metricSnapshot.measurements) {
- val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType)
- dataBuilder.appendMeasurement(key, measurementData)
+ metricSnapshot match {
+ case hs: Histogram.Snapshot ⇒
+ hs.recordsIterator.foreach { record ⇒
+ packetBuilder.appendMeasurement(key, encodeStatsDTimer(record.level, record.count))
+ }
+
+ case cs: Counter.Snapshot ⇒
+ packetBuilder.appendMeasurement(key, encodeStatsDCounter(cs.count))
}
}
- dataBuilder.flush()
+ packetBuilder.flush()
}
- def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): String = {
- def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String =
- value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
-
- instrumentType match {
- case Histogram ⇒ statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count))
- case Gauge ⇒ statsDMetricFormat(measurement.value.toString, "g")
- case Counter ⇒ statsDMetricFormat(measurement.count.toString, "c")
- }
+ def encodeStatsDTimer(level: Long, count: Long): String = {
+ val samplingRate: Double = 1D / count
+ level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
}
+
+ def encodeStatsDCounter(count: Long): String = count.toString + "|c"
}
object StatsDMetricsSender {
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
index 9dfd05f7..19d8a80b 100644
--- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
+++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
@@ -18,11 +18,12 @@ package kamon.statsd
import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.{ ActorRef, Props, ActorSystem }
+import kamon.metric.instrument.Histogram.Precision
+import kamon.metric.instrument.{ Histogram, HdrHistogram }
import org.scalatest.{ Matchers, WordSpecLike }
-import kamon.metrics._
+import kamon.metric._
import akka.io.Udp
-import org.HdrHistogram.HdrRecorder
-import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metric.Subscriptions.TickMetricSnapshot
import java.lang.management.ManagementFactory
import java.net.InetSocketAddress
import com.typesafe.config.ConfigFactory
@@ -31,14 +32,16 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
implicit lazy val system = ActorSystem("statsd-metric-sender-spec",
ConfigFactory.parseString("kamon.statsd.max-packet-size = 256 bytes"))
+ val context = CollectionContext.default
+
"the StatsDMetricSender" should {
"flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture {
val testMetricName = "test-metric"
val testMetricKey = buildMetricKey(testMetricName)
- val testRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
testRecorder.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"$testMetricKey:10|ms")
@@ -47,12 +50,12 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
"render several measurements of the same key under a single (key + multiple measurements) packet" in new UdpListenerFixture {
val testMetricName = "test-metric"
val testMetricKey = buildMetricKey(testMetricName)
- val testRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
testRecorder.record(10L)
testRecorder.record(11L)
testRecorder.record(12L)
- val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"$testMetricKey:10|ms:11|ms:12|ms")
@@ -61,11 +64,11 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
"include the correspondent sampling rate when rendering multiple occurrences of the same value" in new UdpListenerFixture {
val testMetricName = "test-metric"
val testMetricKey = buildMetricKey(testMetricName)
- val testRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
testRecorder.record(10L)
testRecorder.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"$testMetricKey:10|ms|@0.5")
@@ -74,7 +77,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
"flush the packet when the max-packet-size is reached" in new UdpListenerFixture {
val testMetricName = "test-metric"
val testMetricKey = buildMetricKey(testMetricName)
- val testRecorder = HdrRecorder(testMaxPacketSize, 3, Scale.Unit)
+ val testRecorder = Histogram(10000L, Precision.Normal, Scale.Unit)
var bytes = testMetricKey.length
var level = 0
@@ -84,7 +87,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
bytes += s":$level|ms".length
}
- val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
udp.expectMsgType[Udp.Send] // let the first flush pass
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
@@ -97,8 +100,8 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
val secondTestMetricName = "second-test-metric"
val secondTestMetricKey = buildMetricKey(secondTestMetricName)
- val firstTestRecorder = HdrRecorder(1000L, 2, Scale.Unit)
- val secondTestRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ val firstTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
+ val secondTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
firstTestRecorder.record(10L)
firstTestRecorder.record(10L)
@@ -108,8 +111,8 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
secondTestRecorder.record(21L)
val udp = setup(Map(
- firstTestMetricName -> firstTestRecorder.collect(),
- secondTestMetricName -> secondTestRecorder.collect()))
+ firstTestMetricName -> firstTestRecorder.collect(context),
+ secondTestMetricName -> secondTestRecorder.collect(context)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"$firstTestMetricKey:10|ms|@0.5:11|ms\n$secondTestMetricKey:20|ms:21|ms")
@@ -122,7 +125,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
def buildMetricKey(metricName: String): String = s"kamon.$localhostName.test-metric-category.test-group.$metricName"
- def setup(metrics: Map[String, MetricSnapshotLike]): TestProbe = {
+ def setup(metrics: Map[String, MetricSnapshot]): TestProbe = {
val udp = TestProbe()
val metricsSender = system.actorOf(Props(new StatsDMetricsSender(new InetSocketAddress(localhostName, 0), testMaxPacketSize) {
override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref
@@ -149,7 +152,10 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
}
metricsSender ! TickMetricSnapshot(0, 0, Map(testGroupIdentity -> new MetricGroupSnapshot {
- val metrics: Map[MetricIdentity, MetricSnapshotLike] = testMetrics.toMap
+ type GroupSnapshotType = Histogram.Snapshot
+ def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType = ???
+
+ val metrics: Map[MetricIdentity, MetricSnapshot] = testMetrics.toMap
}))
udp
diff --git a/project/Settings.scala b/project/Settings.scala
index 9abd5553..f2ccc32c 100644
--- a/project/Settings.scala
+++ b/project/Settings.scala
@@ -33,7 +33,7 @@ object Settings {
def formattingPreferences =
FormattingPreferences()
.setPreference(RewriteArrowSymbols, true)
- .setPreference(AlignParameters, true)
+ .setPreference(AlignParameters, false)
.setPreference(AlignSingleLineCaseStatements, true)
.setPreference(DoubleIndentClassDeclaration, true)
} \ No newline at end of file