aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-07-04 01:51:58 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-07-04 01:51:58 -0300
commitac2367c8e80bbe6a76f36bd42de9aa9848b0e87b (patch)
tree15c333bf98222fee71d120585733b963f31ae616
parent8d46b83a5b1c45cf292f3d4d9af362c2c75fd2dc (diff)
downloadKamon-ac2367c8e80bbe6a76f36bd42de9aa9848b0e87b.tar.gz
Kamon-ac2367c8e80bbe6a76f36bd42de9aa9848b0e87b.tar.bz2
Kamon-ac2367c8e80bbe6a76f36bd42de9aa9848b0e87b.zip
+ core: introduce the new kamon.metrics.default-collection-context-buffer-size setting
-rw-r--r--kamon-core/src/main/resources/reference.conf10
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala11
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Subscriptions.scala8
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala2
-rw-r--r--kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala3
-rw-r--r--kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala4
-rw-r--r--kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala5
-rw-r--r--kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala10
-rw-r--r--kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala15
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala17
11 files changed, 54 insertions, 35 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index b7f5c70e..3c8f3686 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -16,6 +16,16 @@ kamon {
gauge-recording-interval = 100 milliseconds
+ # Default size for the LongBuffer that gets allocated for metrics collection and merge. The
+ # value should correspond to the highest number of different buckets with values that might
+ # exist in a single histogram during a metrics collection. The default value of 33792 is a
+ # very conservative value and its equal to the total number of buckets required to cover values
+ # from 1 nanosecond to 1 hour with 0.1% precision (3 significant value digits). That means
+ # that would need to have at least one measurement on every bucket of a single histogram to
+ # fully utilize this buffer, which is *really* unlikely to ever happen. Since the buffer should
+ # be allocated once and reused it shouldn't impose a memory footprint issue.
+ default-collection-context-buffer-size = 33792
+
dispatchers {
# Dispatcher for periodical gauge value recordings.
diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala
index 325dd216..3761f5a5 100644
--- a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala
@@ -38,8 +38,8 @@ trait CollectionContext {
}
object CollectionContext {
- def default: CollectionContext = new CollectionContext {
- val buffer: LongBuffer = LongBuffer.allocate(10000)
+ def apply(longBufferSize: Int): CollectionContext = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(longBufferSize)
}
}
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
index 1025f0de..8c6d0359 100644
--- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
@@ -57,14 +57,6 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
subscriptions.tell(Subscribe(category, selection, permanently), receiver)
}
- def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = {
- // TODO: Improve the way in which we are getting the context.
- val context = new CollectionContext {
- val buffer: LongBuffer = LongBuffer.allocate(50000)
- }
- (for ((identity, recorder) ← storage) yield (identity, recorder.collect(context))).toMap
- }
-
def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = {
import scala.concurrent.duration._
@@ -98,6 +90,9 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
allFilters.toMap
}
+
+ def buildDefaultCollectionContext: CollectionContext =
+ CollectionContext(metricsExtConfig.getInt("default-collection-context-buffer-size"))
}
object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider {
diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
index a9f4c721..eb2168ad 100644
--- a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
+++ b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
@@ -30,6 +30,7 @@ class Subscriptions extends Actor {
val config = context.system.settings.config
val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
+ val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
var lastTick: Long = System.currentTimeMillis()
var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty
@@ -55,7 +56,7 @@ class Subscriptions extends Actor {
def flush(): Unit = {
val currentTick = System.currentTimeMillis()
- val snapshots = Kamon(Metrics).collect
+ val snapshots = collectAll()
dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots)
dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots)
@@ -64,6 +65,9 @@ class Subscriptions extends Actor {
subscribedForOneShot = Map.empty
}
+ def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] =
+ (for ((identity, recorder) ← Kamon(Metrics).storage) yield (identity, recorder.collect(collectionContext))).toMap
+
def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]],
snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = {
@@ -90,7 +94,7 @@ object Subscriptions {
class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor {
val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher)
- val collectionContext = CollectionContext.default
+ val collectionContext = Kamon(Metrics)(context.system).buildDefaultCollectionContext
def receive = empty
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
index 9ae077f4..8c81e717 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
@@ -103,6 +103,8 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign
new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude())
}
+ def getCounts = countsArray().length()
+
def cleanup: Unit = {}
private def writeSnapshotTo(buffer: LongBuffer): Long = {
diff --git a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
index ee851672..0af1ae85 100644
--- a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
@@ -31,6 +31,7 @@ class TickMetricSnapshotBufferSpec extends TestKitBase with WordSpecLike with Ma
"""
|kamon.metrics {
| tick-interval = 1 hour
+ | default-collection-context-buffer-size = 10
| filters = [
| {
| trace {
@@ -86,7 +87,7 @@ class TickMetricSnapshotBufferSpec extends TestKitBase with WordSpecLike with Ma
}
trait SnapshotFixtures {
- val collectionContext = CollectionContext.default
+ val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
val testTraceIdentity = TraceMetrics("buffer-spec-test-trace")
val traceRecorder = Kamon(Metrics).register(testTraceIdentity, TraceMetrics.Factory).get
diff --git a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
index dab9b52a..39eaaf9e 100644
--- a/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
@@ -14,6 +14,7 @@ class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with
"""
|kamon.metrics {
| tick-interval = 1 hour
+ | default-collection-context-buffer-size = 10
| filters = [
| {
| trace {
@@ -87,6 +88,7 @@ class TraceMetricsSpec extends TestKitBase with WordSpecLike with Matchers with
def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = {
val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory)
- recorder.get.collect(CollectionContext.default)
+ val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
+ recorder.get.collect(collectionContext)
}
}
diff --git a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
index 57bc3d0d..00a933f1 100644
--- a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
@@ -15,6 +15,7 @@ class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with I
"""
|kamon.metrics {
| flush-interval = 1 hour
+ | default-collection-context-buffer-size = 10
| precision {
| default-histogram-precision {
| highest-trackable-value = 10000
@@ -111,7 +112,7 @@ class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with I
}
"generate a snapshot containing all the registered user metrics and reset all instruments" in {
- val context = CollectionContext.default
+ val context = Kamon(Metrics).buildDefaultCollectionContext
val userMetricsRecorder = Kamon(Metrics).register(UserMetrics, UserMetrics.Factory).get
val histogramWithSettings = Kamon(UserMetrics).registerHistogram("histogram-with-settings", Histogram.Precision.Normal, 10000L)
@@ -219,7 +220,7 @@ class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with I
}
"generate a snapshot that can be merged with another" in {
- val context = CollectionContext.default
+ val context = Kamon(Metrics).buildDefaultCollectionContext
val userMetricsRecorder = Kamon(Metrics).register(UserMetrics, UserMetrics.Factory).get
val histogram = Kamon(UserMetrics).registerHistogram("histogram-for-merge")
diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala
index b3ff3c9f..9192d999 100644
--- a/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala
@@ -4,15 +4,17 @@ import java.util.concurrent.atomic.AtomicLong
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
-import kamon.metric.{ Scale, CollectionContext }
+import kamon.Kamon
+import kamon.metric.{ Metrics, Scale, CollectionContext }
import org.scalatest.{ Matchers, WordSpecLike }
import scala.concurrent.duration._
class GaugeSpec extends WordSpecLike with Matchers {
- val system = ActorSystem("gauge-spec", ConfigFactory.parseString(
+ implicit val system = ActorSystem("gauge-spec", ConfigFactory.parseString(
"""
|kamon.metrics {
| flush-interval = 1 hour
+ | default-collection-context-buffer-size = 10
| precision {
| default-gauge-precision {
| refresh-interval = 100 milliseconds
@@ -50,7 +52,7 @@ class GaugeSpec extends WordSpecLike with Matchers {
Thread.sleep(1.second.toMillis)
gauge.cleanup
- val snapshot = gauge.collect(CollectionContext.default)
+ val snapshot = gauge.collect(Kamon(Metrics).buildDefaultCollectionContext)
snapshot.numberOfMeasurements should be(10L +- 1L)
snapshot.min should be(1)
@@ -61,7 +63,7 @@ class GaugeSpec extends WordSpecLike with Matchers {
val numberOfValuesRecorded = new AtomicLong(0)
val gauge = Gauge(Histogram.Precision.Normal, 10000L, Scale.Unit, 1 hour, system)(() ⇒ numberOfValuesRecorded.addAndGet(1))
- val snapshot = gauge.collect(CollectionContext.default)
+ val snapshot = gauge.collect(Kamon(Metrics).buildDefaultCollectionContext)
snapshot.numberOfMeasurements should be(0)
numberOfValuesRecorded.get() should be(0)
diff --git a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
index cb82c362..91b503e2 100644
--- a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
+++ b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
@@ -18,6 +18,7 @@ package kamon.datadog
import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.{ Props, ActorRef, ActorSystem }
+import kamon.Kamon
import kamon.metric.instrument.Histogram.Precision
import kamon.metric.instrument.{ Counter, Histogram, HdrHistogram, LongAdderCounter }
import org.scalatest.{ Matchers, WordSpecLike }
@@ -32,7 +33,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
implicit lazy val system = ActorSystem("datadog-metric-sender-spec",
ConfigFactory.parseString("kamon.datadog.max-packet-size = 256 bytes"))
- val context = CollectionContext.default
+ val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
"the DataDogMetricSender" should {
"send latency measurements" in new UdpListenerFixture {
@@ -40,7 +41,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
testRecorder.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"kamon.actor.processing-time:10|ms|#actor:user/kamon")
@@ -52,7 +53,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
testRecorder.record(10L)
testRecorder.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"kamon.actor.processing-time:10|ms|@0.5|#actor:user/kamon")
@@ -71,7 +72,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
bytes += s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon".length
}
- val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
udp.expectMsgType[Udp.Send] // let the first flush pass
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
@@ -95,9 +96,9 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
thirdTestRecorder.increment(4L)
val udp = setup(Map(
- firstTestMetricName -> firstTestRecorder.collect(context),
- secondTestMetricName -> secondTestRecorder.collect(context),
- thirdTestMetricName -> thirdTestRecorder.collect(context)))
+ firstTestMetricName -> firstTestRecorder.collect(collectionContext),
+ secondTestMetricName -> secondTestRecorder.collect(collectionContext),
+ thirdTestMetricName -> thirdTestRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be("kamon.actor.processing-time-1:10|ms|@0.5|#actor:user/kamon\nkamon.actor.processing-time-2:21|ms|#actor:user/kamon\nkamon.actor.counter:4|c|#actor:user/kamon")
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
index 19d8a80b..60d52491 100644
--- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
+++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
@@ -18,8 +18,9 @@ package kamon.statsd
import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.{ ActorRef, Props, ActorSystem }
+import kamon.Kamon
import kamon.metric.instrument.Histogram.Precision
-import kamon.metric.instrument.{ Histogram, HdrHistogram }
+import kamon.metric.instrument.Histogram
import org.scalatest.{ Matchers, WordSpecLike }
import kamon.metric._
import akka.io.Udp
@@ -32,7 +33,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
implicit lazy val system = ActorSystem("statsd-metric-sender-spec",
ConfigFactory.parseString("kamon.statsd.max-packet-size = 256 bytes"))
- val context = CollectionContext.default
+ val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
"the StatsDMetricSender" should {
"flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture {
@@ -41,7 +42,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
testRecorder.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"$testMetricKey:10|ms")
@@ -55,7 +56,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
testRecorder.record(11L)
testRecorder.record(12L)
- val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"$testMetricKey:10|ms:11|ms:12|ms")
@@ -68,7 +69,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
testRecorder.record(10L)
testRecorder.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"$testMetricKey:10|ms|@0.5")
@@ -87,7 +88,7 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
bytes += s":$level|ms".length
}
- val udp = setup(Map(testMetricName -> testRecorder.collect(context)))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
udp.expectMsgType[Udp.Send] // let the first flush pass
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
@@ -111,8 +112,8 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
secondTestRecorder.record(21L)
val udp = setup(Map(
- firstTestMetricName -> firstTestRecorder.collect(context),
- secondTestMetricName -> secondTestRecorder.collect(context)))
+ firstTestMetricName -> firstTestRecorder.collect(collectionContext),
+ secondTestMetricName -> secondTestRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"$firstTestMetricKey:10|ms|@0.5:11|ms\n$secondTestMetricKey:20|ms:21|ms")