diff options
11 files changed, 54 insertions, 35 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index b7f5c70e..3c8f3686 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -16,6 +16,16 @@ kamon { gauge-recording-interval = 100 milliseconds + # Default size for the LongBuffer that gets allocated for metrics collection and merge. The + # value should correspond to the highest number of different buckets with values that might + # exist in a single histogram during a metrics collection. The default value of 33792 is a + # very conservative value and its equal to the total number of buckets required to cover values + # from 1 nanosecond to 1 hour with 0.1% precision (3 significant value digits). That means + # that would need to have at least one measurement on every bucket of a single histogram to + # fully utilize this buffer, which is *really* unlikely to ever happen. Since the buffer should + # be allocated once and reused it shouldn't impose a memory footprint issue. + default-collection-context-buffer-size = 33792 + dispatchers { # Dispatcher for periodical gauge value recordings. diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala index 325dd216..3761f5a5 100644 --- a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala @@ -38,8 +38,8 @@ trait CollectionContext { } object CollectionContext { - def default: CollectionContext = new CollectionContext { - val buffer: LongBuffer = LongBuffer.allocate(10000) + def apply(longBufferSize: Int): CollectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(longBufferSize) } } diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala index 1025f0de..8c6d0359 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -57,14 +57,6 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { subscriptions.tell(Subscribe(category, selection, permanently), receiver) } - def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = { - // TODO: Improve the way in which we are getting the context. - val context = new CollectionContext { - val buffer: LongBuffer = LongBuffer.allocate(50000) - } - (for ((identity, recorder) ← storage) yield (identity, recorder.collect(context))).toMap - } - def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { import scala.concurrent.duration._ @@ -98,6 +90,9 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { allFilters.toMap } + + def buildDefaultCollectionContext: CollectionContext = + CollectionContext(metricsExtConfig.getInt("default-collection-context-buffer-size")) } object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala index a9f4c721..eb2168ad 100644 --- a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala +++ b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala @@ -30,6 +30,7 @@ class Subscriptions extends Actor { val config = context.system.settings.config val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS) val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + val collectionContext = Kamon(Metrics).buildDefaultCollectionContext var lastTick: Long = System.currentTimeMillis() var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty @@ -55,7 +56,7 @@ class Subscriptions extends Actor { def flush(): Unit = { val currentTick = System.currentTimeMillis() - val snapshots = Kamon(Metrics).collect + val snapshots = collectAll() dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) @@ -64,6 +65,9 @@ class Subscriptions extends Actor { subscribedForOneShot = Map.empty } + def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] = + (for ((identity, recorder) ← Kamon(Metrics).storage) yield (identity, recorder.collect(collectionContext))).toMap + def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { @@ -90,7 +94,7 @@ object Subscriptions { class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) - val collectionContext = CollectionContext.default + val collectionContext = Kamon(Metrics)(context.system).buildDefaultCollectionContext def receive = empty diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala index 9ae077f4..8c81e717 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -103,6 +103,8 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) } + def getCounts = countsArray().length() + def cleanup: Unit = {} private def writeSnapshotTo(buffer: LongBuffer): Long = { diff --git a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala index ee851672..0af1ae85 100644 --- a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala @@ -31,6 +31,7 @@ class TickMetricSnapshotBufferSpec extends TestKitBase with WordSpecLike with Ma """ |kamon.metrics { | tick-interval = 1 hour + | default-collection-context-buffer-size = 10 | filters = [ | { | trace { @@ -86,7 +87,7 @@ class TickMetricSnapshotBufferSpec extends TestKitBase with WordSpecLike with Ma } trait SnapshotFixtures { - val collectionContext = CollectionContext.default + val collectionContext = Kamon(Metrics).buildDefaultCollectionContext val testTraceIdentity = TraceMetrics("buffer-spec-test-trace") val traceRecorder = Kamon(Metrics).register(testTraceIdentity, TraceMetrics.Factory).get diff --git a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala index dab9b52a..39eaaf9e 100644 --- a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala @@ -14,6 +14,7 @@ class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with """ |kamon.metrics { | tick-interval = 1 hour + | default-collection-context-buffer-size = 10 | filters = [ | { | trace { @@ -87,6 +88,7 @@ class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = { val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory) - recorder.get.collect(CollectionContext.default) + val collectionContext = Kamon(Metrics).buildDefaultCollectionContext + recorder.get.collect(collectionContext) } } diff --git a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala index 57bc3d0d..00a933f1 100644 --- a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala @@ -15,6 +15,7 @@ class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with I """ |kamon.metrics { | flush-interval = 1 hour + | default-collection-context-buffer-size = 10 | precision { | default-histogram-precision { | highest-trackable-value = 10000 @@ -111,7 +112,7 @@ class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with I } "generate a snapshot containing all the registered user metrics and reset all instruments" in { - val context = CollectionContext.default + val context = Kamon(Metrics).buildDefaultCollectionContext val userMetricsRecorder = Kamon(Metrics).register(UserMetrics, UserMetrics.Factory).get val histogramWithSettings = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L) @@ -219,7 +220,7 @@ class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with I } "generate a snapshot that can be merged with another" in { - val context = CollectionContext.default + val context = Kamon(Metrics).buildDefaultCollectionContext val userMetricsRecorder = Kamon(Metrics).register(UserMetrics, UserMetrics.Factory).get val histogram = Kamon(UserMetrics).registerHistogram("histogram-for-merge") diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala index b3ff3c9f..9192d999 100644 --- a/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala @@ -4,15 +4,17 @@ import java.util.concurrent.atomic.AtomicLong import akka.actor.ActorSystem import com.typesafe.config.ConfigFactory -import kamon.metric.{ Scale, CollectionContext } +import kamon.Kamon +import kamon.metric.{ Metrics, Scale, CollectionContext } import org.scalatest.{ Matchers, WordSpecLike } import scala.concurrent.duration._ class GaugeSpec extends WordSpecLike with Matchers { - val system = ActorSystem("gauge-spec", ConfigFactory.parseString( + implicit val system = ActorSystem("gauge-spec", ConfigFactory.parseString( """ |kamon.metrics { | flush-interval = 1 hour + | default-collection-context-buffer-size = 10 | precision { | default-gauge-precision { | refresh-interval = 100 milliseconds @@ -50,7 +52,7 @@ class GaugeSpec extends WordSpecLike with Matchers { Thread.sleep(1.second.toMillis) gauge.cleanup - val snapshot = gauge.collect(CollectionContext.default) + val snapshot = gauge.collect(Kamon(Metrics).buildDefaultCollectionContext) snapshot.numberOfMeasurements should be(10L +- 1L) snapshot.min should be(1) @@ -61,7 +63,7 @@ class GaugeSpec extends WordSpecLike with Matchers { val numberOfValuesRecorded = new AtomicLong(0) val gauge = Gauge(Histogram.Precision.Normal, 10000L, Scale.Unit, 1 hour, system)(() ⇒ numberOfValuesRecorded.addAndGet(1)) - val snapshot = gauge.collect(CollectionContext.default) + val snapshot = gauge.collect(Kamon(Metrics).buildDefaultCollectionContext) snapshot.numberOfMeasurements should be(0) numberOfValuesRecorded.get() should be(0) diff --git a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala index cb82c362..91b503e2 100644 --- a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala +++ b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala @@ -18,6 +18,7 @@ package kamon.datadog import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.{ Props, ActorRef, ActorSystem } +import kamon.Kamon import kamon.metric.instrument.Histogram.Precision import kamon.metric.instrument.{ Counter, Histogram, HdrHistogram, LongAdderCounter } import org.scalatest.{ Matchers, WordSpecLike } @@ -32,7 +33,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher implicit lazy val system = ActorSystem("datadog-metric-sender-spec", ConfigFactory.parseString("kamon.datadog.max-packet-size = 256 bytes")) - val context = CollectionContext.default + val collectionContext = Kamon(Metrics).buildDefaultCollectionContext "the DataDogMetricSender" should { "send latency measurements" in new UdpListenerFixture { @@ -40,7 +41,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) testRecorder.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect(context))) + val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"kamon.actor.processing-time:10|ms|#actor:user/kamon") @@ -52,7 +53,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher testRecorder.record(10L) testRecorder.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect(context))) + val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"kamon.actor.processing-time:10|ms|@0.5|#actor:user/kamon") @@ -71,7 +72,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher bytes += s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon".length } - val udp = setup(Map(testMetricName -> testRecorder.collect(context))) + val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) udp.expectMsgType[Udp.Send] // let the first flush pass val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] @@ -95,9 +96,9 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher thirdTestRecorder.increment(4L) val udp = setup(Map( - firstTestMetricName -> firstTestRecorder.collect(context), - secondTestMetricName -> secondTestRecorder.collect(context), - thirdTestMetricName -> thirdTestRecorder.collect(context))) + firstTestMetricName -> firstTestRecorder.collect(collectionContext), + secondTestMetricName -> secondTestRecorder.collect(collectionContext), + thirdTestMetricName -> thirdTestRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be("kamon.actor.processing-time-1:10|ms|@0.5|#actor:user/kamon\nkamon.actor.processing-time-2:21|ms|#actor:user/kamon\nkamon.actor.counter:4|c|#actor:user/kamon") diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala index 19d8a80b..60d52491 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala @@ -18,8 +18,9 @@ package kamon.statsd import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.{ ActorRef, Props, ActorSystem } +import kamon.Kamon import kamon.metric.instrument.Histogram.Precision -import kamon.metric.instrument.{ Histogram, HdrHistogram } +import kamon.metric.instrument.Histogram import org.scalatest.{ Matchers, WordSpecLike } import kamon.metric._ import akka.io.Udp @@ -32,7 +33,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers implicit lazy val system = ActorSystem("statsd-metric-sender-spec", ConfigFactory.parseString("kamon.statsd.max-packet-size = 256 bytes")) - val context = CollectionContext.default + val collectionContext = Kamon(Metrics).buildDefaultCollectionContext "the StatsDMetricSender" should { "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture { @@ -41,7 +42,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) testRecorder.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect(context))) + val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$testMetricKey:10|ms") @@ -55,7 +56,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers testRecorder.record(11L) testRecorder.record(12L) - val udp = setup(Map(testMetricName -> testRecorder.collect(context))) + val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$testMetricKey:10|ms:11|ms:12|ms") @@ -68,7 +69,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers testRecorder.record(10L) testRecorder.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect(context))) + val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$testMetricKey:10|ms|@0.5") @@ -87,7 +88,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers bytes += s":$level|ms".length } - val udp = setup(Map(testMetricName -> testRecorder.collect(context))) + val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) udp.expectMsgType[Udp.Send] // let the first flush pass val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] @@ -111,8 +112,8 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers secondTestRecorder.record(21L) val udp = setup(Map( - firstTestMetricName -> firstTestRecorder.collect(context), - secondTestMetricName -> secondTestRecorder.collect(context))) + firstTestMetricName -> firstTestRecorder.collect(collectionContext), + secondTestMetricName -> secondTestRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$firstTestMetricKey:10|ms|@0.5:11|ms\n$secondTestMetricKey:20|ms:21|ms") |