aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-06-15 22:56:17 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-06-15 22:56:17 +0200
commitbc481389e427c83069b092e24200dbe960aaeb67 (patch)
tree56fc96d258d4d921880610cde811d24c8c675784
parent8aa64c486f2ad1f31a5f27a0780f8c43f31b7f8c (diff)
downloadKamon-bc481389e427c83069b092e24200dbe960aaeb67.tar.gz
Kamon-bc481389e427c83069b092e24200dbe960aaeb67.tar.bz2
Kamon-bc481389e427c83069b092e24200dbe960aaeb67.zip
expose the internal scheduler
-rw-r--r--kamon-core/src/main/resources/reference.conf4
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala18
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala7
-rw-r--r--kamon-core/src/main/scala/kamon/util/Registration.scala24
5 files changed, 48 insertions, 11 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 125a1075..6ad06325 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -17,6 +17,10 @@ kamon {
# Example: `reporters = ["kamon.statsd.StatsD", "kamon.zipkin.Zipkin"]`.
reporters = [ ]
+ # Pool size for the executor service that will run sampling on MinMaxCounter instruments. This scheduler is accesible
+ # through Kamon.scheduler()
+ scheduler-pool-size = 2
+
metric {
tick-interval = 60 seconds
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 81cd3f56..fc121d52 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -20,10 +20,11 @@ import io.opentracing.propagation.Format
import io.opentracing.{ActiveSpan, Span, SpanContext}
import kamon.metric._
import kamon.trace.Tracer
-import kamon.util.{Filters, MeasurementUnit}
+import kamon.util.{Filters, MeasurementUnit, Registration}
import scala.concurrent.Future
import java.time.Duration
+import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor}
import io.opentracing.ActiveSpan.Continuation
import org.slf4j.LoggerFactory
@@ -37,7 +38,8 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac
@volatile private var _environment = Environment.fromConfig(_config)
@volatile private var _filters = Filters.fromConfig(_config)
- private val _metrics = new MetricRegistry(_config)
+ private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler"))
+ private val _metrics = new MetricRegistry(_config, _scheduler)
private val _reporters = new ReporterRegistryImpl(_metrics, _config)
private val _tracer = new Tracer(Kamon, _reporters, _config)
private var _onReconfigureHooks = Seq.empty[OnReconfigureHook]
@@ -60,6 +62,11 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac
logger.error("Exception occurred while trying to run a OnReconfigureHook", error)
)
})
+
+ _scheduler match {
+ case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config))
+ case other => logger.error("Unexpected scheduler [{}] found when reconfiguring Kamon.", other)
+ }
}
@@ -176,6 +183,13 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac
def onReconfigure(hook: OnReconfigureHook): Unit = synchronized {
_onReconfigureHooks = hook +: _onReconfigureHooks
}
+
+ def scheduler(): ScheduledExecutorService =
+ _scheduler
+
+ private def schedulerPoolSize(config: Config): Int =
+ config.getInt("kamon.scheduler-pool-size")
+
}
trait OnReconfigureHook {
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index 1cdef6ab..e18b6a78 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -22,6 +22,7 @@ import java.util.concurrent._
import com.typesafe.config.Config
import kamon.metric._
import kamon.trace.Span
+import kamon.util.Registration
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
@@ -41,11 +42,6 @@ trait ReporterRegistry {
def stopAllReporters(): Future[Unit]
}
-
-trait Registration {
- def cancel(): Boolean
-}
-
trait MetricReporter {
def start(): Unit
def stop(): Unit
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala
index cecc2c19..07c1d202 100644
--- a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala
@@ -24,13 +24,12 @@ import kamon.util.MeasurementUnit
import scala.collection.concurrent.TrieMap
import java.time.Duration
-import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
import org.slf4j.LoggerFactory
-class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator {
- private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-min-max-counter-sampler"))
+class MetricRegistry(initialConfig: Config, scheduler: ScheduledExecutorService) extends MetricsSnapshotGenerator {
private val logger = LoggerFactory.getLogger(classOf[MetricRegistry])
private val instrumentFactory = new AtomicReference[InstrumentFactory]()
private val metrics = TrieMap.empty[String, BaseMetric[_, _]]
@@ -52,7 +51,7 @@ class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator {
lookupMetric(name, unit, InstrumentTypes.Gauge)(new GaugeMetricImpl(name, unit, instrumentFactory))
def minMaxCounter(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange], sampleInterval: Option[Duration]): MinMaxCounterMetric =
- lookupMetric(name, unit, InstrumentTypes.MinMaxCounter)(new MinMaxCounterMetricImpl(name, unit, dynamicRange, sampleInterval, instrumentFactory, registryExecutionContext))
+ lookupMetric(name, unit, InstrumentTypes.MinMaxCounter)(new MinMaxCounterMetricImpl(name, unit, dynamicRange, sampleInterval, instrumentFactory, scheduler))
override def snapshot(): MetricsSnapshot = synchronized {
diff --git a/kamon-core/src/main/scala/kamon/util/Registration.scala b/kamon-core/src/main/scala/kamon/util/Registration.scala
new file mode 100644
index 00000000..80142ffb
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/Registration.scala
@@ -0,0 +1,24 @@
+/* =========================================================================================
+ * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.util
+
+/**
+ * Implementations of this interface encapsulate whatever state is necessary to properly handle de-registration from
+ * the component that returned the registration.
+ */
+trait Registration {
+ def cancel(): Boolean
+}