aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-06-08 15:35:42 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-06-08 15:35:42 +0200
commitb94ca0cbb931799ccd2ad2d245660c4d48032c83 (patch)
treed87dc8ccfc0395cdc331b2cfb69e799a3f4d0974
parent7817621ad1a62db1bd7b7b60ed823b4da169a34a (diff)
downloadKamon-b94ca0cbb931799ccd2ad2d245660c4d48032c83.tar.gz
Kamon-b94ca0cbb931799ccd2ad2d245660c4d48032c83.tar.bz2
Kamon-b94ca0cbb931799ccd2ad2d245660c4d48032c83.zip
get a basic threading model for the span reporters
-rw-r--r--kamon-core/src/main/resources/reference.conf6
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala155
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Accumulator.scala17
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Scaler.scala (renamed from kamon-core/src/main/scala/kamon/util/MetricScaler.scala)6
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala13
5 files changed, 147 insertions, 50 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index fd1c88c3..aca1a9f3 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -76,6 +76,12 @@ kamon {
}
trace {
+
+ tick-interval = 10 seconds
+
+ reporter-queue-size = 1024
+
+
# Configures a sample that decides which traces should be reported to the trace backends. The possible values are:
# - always: report all traces.
# - never: don't report any trace.
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index a22162eb..45c93ec4 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -12,13 +12,15 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Futu
import scala.util.Try
import scala.util.control.NonFatal
import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
trait ReporterRegistry {
def loadFromConfig(): Unit
- def add(reporter: MetricsReporter): Registration
- def add(reporter: MetricsReporter, name: String): Registration
- def add(reporter: SpansReporter): Registration
+ def add(reporter: MetricReporter): Registration
+ def add(reporter: MetricReporter, name: String): Registration
+ def add(reporter: SpanReporter): Registration
+ def add(reporter: SpanReporter, name: String): Registration
def stopAll(): Future[Unit]
}
@@ -28,7 +30,7 @@ trait Registration {
def cancel(): Boolean
}
-trait MetricsReporter {
+trait MetricReporter {
def start(): Unit
def stop(): Unit
@@ -36,64 +38,86 @@ trait MetricsReporter {
def reportTickSnapshot(snapshot: TickSnapshot): Unit
}
-trait SpansReporter {
+trait SpanReporter {
def start(): Unit
def stop(): Unit
def reconfigure(config: Config): Unit
- def reportSpan(span: Span.CompletedSpan): Unit
+ def reportSpans(spans: Seq[Span.CompletedSpan]): Unit
}
class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry {
+ private val reporterCounter = new AtomicLong(0L)
private val registryExecutionContext = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-reporter-registry"))
private val metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
- private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]()
- private val spanReporters = new ConcurrentLinkedQueue[SpansReporter]()
- private val reporterCounter = new AtomicLong(0L)
+ private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
+
+ private val metricReporters = TrieMap[Long, MetricReporterEntry]()
+ private val spanReporters = TrieMap[Long, SpanReporterEntry]()
+
reconfigure(initialConfig)
override def loadFromConfig(): Unit = ???
- override def add(reporter: MetricsReporter): Registration =
- add(reporter, reporter.getClass.getName())
+ override def add(reporter: MetricReporter): Registration =
+ addMetricReporter(reporter, reporter.getClass.getName())
+
+ override def add(reporter: MetricReporter, name: String): Registration =
+ addMetricReporter(reporter, name)
+
+ override def add(reporter: SpanReporter): Registration =
+ addSpanReporter(reporter, reporter.getClass.getName())
+
+ override def add(reporter: SpanReporter, name: String): Registration =
+ addSpanReporter(reporter, name)
- override def add(reporter: MetricsReporter, name: String): Registration = {
+
+ private def addMetricReporter(reporter: MetricReporter, name: String): Registration = {
val executor = Executors.newSingleThreadExecutor(threadFactory(name))
- val reporterEntry = new ReporterEntry(
+ val reporterEntry = new MetricReporterEntry(
id = reporterCounter.getAndIncrement(),
reporter = reporter,
executionContext = ExecutionContext.fromExecutorService(executor)
)
- metricReporters.add(reporterEntry)
-
- new Registration {
- val reporterID = reporterEntry.id
- override def cancel(): Boolean = {
- metricReporters.iterator().asScala
- .find(e => e.id == reporterID)
- .map(e => stopReporter(e))
- .isDefined
- }
- }
+ metricReporters.put(reporterEntry.id, reporterEntry)
+ createRegistration(reporterEntry.id, metricReporters)
}
- override def add(reporter: SpansReporter): Registration = {
- spanReporters.add(reporter)
+ private def addSpanReporter(reporter: SpanReporter, name: String): Registration = {
+ val executor = Executors.newSingleThreadExecutor(threadFactory(name))
+ val reporterEntry = new SpanReporterEntry(
+ id = reporterCounter.incrementAndGet(),
+ reporter = reporter,
+ bufferCapacity = 1024,
+ executionContext = ExecutionContext.fromExecutorService(executor)
+ )
+
+ spanReporters.put(reporterEntry.id, reporterEntry)
+ createRegistration(reporterEntry.id, spanReporters)
+ }
- new Registration {
- override def cancel(): Boolean = true
- }
+ private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration {
+ override def cancel(): Boolean =
+ metricReporters.remove(id).nonEmpty
}
override def stopAll(): Future[Unit] = {
implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext)
val reporterStopFutures = Vector.newBuilder[Future[Unit]]
- while(!metricReporters.isEmpty) {
- val entry = metricReporters.poll()
- if(entry != null) {
- reporterStopFutures += stopReporter(entry)
+
+ while(metricReporters.nonEmpty) {
+ val (idToRemove, _) = metricReporters.head
+ metricReporters.remove(idToRemove).foreach { entry =>
+ reporterStopFutures += stopMetricReporter(entry)
+ }
+ }
+
+ while(spanReporters.nonEmpty) {
+ val (idToRemove, _) = spanReporters.head
+ spanReporters.remove(idToRemove).foreach { entry =>
+ reporterStopFutures += stopSpanReporter(entry)
}
}
@@ -102,29 +126,51 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
private[kamon] def reconfigure(config: Config): Unit = synchronized {
val tickIntervalMillis = config.getDuration("kamon.metric.tick-interval", TimeUnit.MILLISECONDS)
+ val traceTickIntervalMillis = config.getDuration("kamon.trace.tick-interval", TimeUnit.MILLISECONDS)
+
val currentTicker = metricsTickerSchedule.get()
if(currentTicker != null) {
currentTicker.cancel(true)
}
// Reconfigure all registered reporters
- metricReporters.iterator().asScala.foreach(entry =>
+ metricReporters.foreach { case (_, entry) =>
Future(entry.reporter.reconfigure(config))(entry.executionContext)
- )
+ }
+
+ spanReporters.foreach { case (_, entry) =>
+ Future(entry.reporter.reconfigure(config))(entry.executionContext)
+ }
metricsTickerSchedule.set {
registryExecutionContext.scheduleAtFixedRate(
new MetricTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
)
}
- }
+ spanReporterTickerSchedule.set {
+ registryExecutionContext.scheduleAtFixedRate(
+ new SpanTicker(spanReporters), traceTickIntervalMillis, traceTickIntervalMillis, TimeUnit.MILLISECONDS
+ )
+ }
+ }
private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = {
- spanReporters.iterator().asScala.foreach(_.reportSpan(span))
+ spanReporters.foreach { case (_, reporterEntry) =>
+ if(reporterEntry.isActive)
+ reporterEntry.buffer.offer(span)
+ }
+ }
+
+ private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = {
+ entry.isActive = false
+
+ Future(entry.reporter.stop())(entry.executionContext).andThen {
+ case _ => entry.executionContext.shutdown()
+ }(ExecutionContext.fromExecutor(registryExecutionContext))
}
- private def stopReporter(entry: ReporterEntry): Future[Unit] = {
+ private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = {
entry.isActive = false
Future(entry.reporter.stop())(entry.executionContext).andThen {
@@ -132,14 +178,24 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
}(ExecutionContext.fromExecutor(registryExecutionContext))
}
- private class ReporterEntry(
+ private class MetricReporterEntry(
@volatile var isActive: Boolean = true,
val id: Long,
- val reporter: MetricsReporter,
+ val reporter: MetricReporter,
val executionContext: ExecutionContextExecutorService
)
- private class MetricTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable {
+ private class SpanReporterEntry(
+ @volatile var isActive: Boolean = true,
+ val id: Long,
+ val reporter: SpanReporter,
+ val bufferCapacity: Int,
+ val executionContext: ExecutionContextExecutorService
+ ) {
+ val buffer = new ArrayBlockingQueue[Span.CompletedSpan](bufferCapacity)
+ }
+
+ private class MetricTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable {
val logger = Logger(classOf[MetricTicker])
var lastTick = System.currentTimeMillis()
@@ -150,7 +206,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
metrics = snapshotGenerator.snapshot()
)
- reporterEntries.iterator().asScala.foreach { entry =>
+ reporterEntries.foreach { case (_, entry) =>
Future {
if(entry.isActive)
entry.reporter.reportTickSnapshot(tickSnapshot)
@@ -164,4 +220,19 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
case NonFatal(t) => logger.error("Error while running a tick", t)
}
}
+
+ private class SpanTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable {
+ override def run(): Unit = {
+ spanReporters.foreach {
+ case (_, entry) =>
+
+ val spanBatch = new java.util.ArrayList[Span.CompletedSpan](entry.bufferCapacity)
+ entry.buffer.drainTo(spanBatch, entry.bufferCapacity)
+
+ Future {
+ entry.reporter.reportSpans(spanBatch.asScala)
+ }(entry.executionContext)
+ }
+ }
+ }
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala
new file mode 100644
index 00000000..b87f5530
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala
@@ -0,0 +1,17 @@
+package kamon.metric
+
+import kamon.util.MeasurementUnit
+
+
+class DistributionAccumulator(dynamicRange: DynamicRange) {
+ private val accumulatorHistogram = new HdrHistogram("metric-distribution-accumulator",
+ tags = Map.empty, measurementUnit = MeasurementUnit.none, dynamicRange)
+
+
+ def add(distribution: Distribution): Unit = {
+ distribution.bucketsIterator.foreach(b => accumulatorHistogram.record(b.value, b.frequency))
+ }
+
+ def result(): Distribution =
+ accumulatorHistogram.snapshot().distribution
+}
diff --git a/kamon-core/src/main/scala/kamon/util/MetricScaler.scala b/kamon-core/src/main/scala/kamon/metric/Scaler.scala
index ca77a9a1..f8f51c00 100644
--- a/kamon-core/src/main/scala/kamon/util/MetricScaler.scala
+++ b/kamon-core/src/main/scala/kamon/metric/Scaler.scala
@@ -1,9 +1,9 @@
-package kamon.util
+package kamon.metric
-import kamon.metric.{DynamicRange, HdrHistogram, MetricDistribution, MetricValue}
+import kamon.util.MeasurementUnit
import kamon.util.MeasurementUnit.Dimension
-class MetricScaler(targetTimeUnit: MeasurementUnit, targetInformationUnit: MeasurementUnit, dynamicRange: DynamicRange) {
+class Scaler(targetTimeUnit: MeasurementUnit, targetInformationUnit: MeasurementUnit, dynamicRange: DynamicRange) {
require(targetTimeUnit.dimension == Dimension.Time, "timeUnit must be in the time dimension.")
require(targetInformationUnit.dimension == Dimension.Information, "informationUnit must be in the information dimension.")
diff --git a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala
index e8587ffe..6aed1ab3 100644
--- a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala
+++ b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala
@@ -1,8 +1,15 @@
package kamon.metric
-
import kamon.util.MeasurementUnit
+
+/**
+ *
+ * @param interval
+ * @param metrics
+ */
+case class TickSnapshot(interval: Interval, metrics: MetricsSnapshot)
+
case class Interval(from: Long, to: Long)
case class MetricsSnapshot(
@@ -12,10 +19,6 @@ case class MetricsSnapshot(
counters: Seq[MetricValue]
)
-case class TickSnapshot(interval: Interval, metrics: MetricsSnapshot)
-
-
-
/**
* Snapshot for instruments that internally track a single value. Meant to be used for counters and gauges.
*