diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-06-15 22:56:17 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-06-15 22:56:17 +0200 |
commit | bc481389e427c83069b092e24200dbe960aaeb67 (patch) | |
tree | 56fc96d258d4d921880610cde811d24c8c675784 /kamon-core/src | |
parent | 8aa64c486f2ad1f31a5f27a0780f8c43f31b7f8c (diff) | |
download | Kamon-bc481389e427c83069b092e24200dbe960aaeb67.tar.gz Kamon-bc481389e427c83069b092e24200dbe960aaeb67.tar.bz2 Kamon-bc481389e427c83069b092e24200dbe960aaeb67.zip |
expose the internal scheduler
Diffstat (limited to 'kamon-core/src')
5 files changed, 48 insertions, 11 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 125a1075..6ad06325 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -17,6 +17,10 @@ kamon { # Example: `reporters = ["kamon.statsd.StatsD", "kamon.zipkin.Zipkin"]`. reporters = [ ] + # Pool size for the executor service that will run sampling on MinMaxCounter instruments. This scheduler is accesible + # through Kamon.scheduler() + scheduler-pool-size = 2 + metric { tick-interval = 60 seconds diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 81cd3f56..fc121d52 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -20,10 +20,11 @@ import io.opentracing.propagation.Format import io.opentracing.{ActiveSpan, Span, SpanContext} import kamon.metric._ import kamon.trace.Tracer -import kamon.util.{Filters, MeasurementUnit} +import kamon.util.{Filters, MeasurementUnit, Registration} import scala.concurrent.Future import java.time.Duration +import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor} import io.opentracing.ActiveSpan.Continuation import org.slf4j.LoggerFactory @@ -37,7 +38,8 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac @volatile private var _environment = Environment.fromConfig(_config) @volatile private var _filters = Filters.fromConfig(_config) - private val _metrics = new MetricRegistry(_config) + private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler")) + private val _metrics = new MetricRegistry(_config, _scheduler) private val _reporters = new ReporterRegistryImpl(_metrics, _config) private val _tracer = new Tracer(Kamon, _reporters, _config) private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] @@ -60,6 +62,11 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac logger.error("Exception occurred while trying to run a OnReconfigureHook", error) ) }) + + _scheduler match { + case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config)) + case other => logger.error("Unexpected scheduler [{}] found when reconfiguring Kamon.", other) + } } @@ -176,6 +183,13 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac def onReconfigure(hook: OnReconfigureHook): Unit = synchronized { _onReconfigureHooks = hook +: _onReconfigureHooks } + + def scheduler(): ScheduledExecutorService = + _scheduler + + private def schedulerPoolSize(config: Config): Int = + config.getInt("kamon.scheduler-pool-size") + } trait OnReconfigureHook { diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 1cdef6ab..e18b6a78 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -22,6 +22,7 @@ import java.util.concurrent._ import com.typesafe.config.Config import kamon.metric._ import kamon.trace.Span +import kamon.util.Registration import org.slf4j.LoggerFactory import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} @@ -41,11 +42,6 @@ trait ReporterRegistry { def stopAllReporters(): Future[Unit] } - -trait Registration { - def cancel(): Boolean -} - trait MetricReporter { def start(): Unit def stop(): Unit diff --git a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala index cecc2c19..07c1d202 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala @@ -24,13 +24,12 @@ import kamon.util.MeasurementUnit import scala.collection.concurrent.TrieMap import java.time.Duration -import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService import org.slf4j.LoggerFactory -class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { - private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-min-max-counter-sampler")) +class MetricRegistry(initialConfig: Config, scheduler: ScheduledExecutorService) extends MetricsSnapshotGenerator { private val logger = LoggerFactory.getLogger(classOf[MetricRegistry]) private val instrumentFactory = new AtomicReference[InstrumentFactory]() private val metrics = TrieMap.empty[String, BaseMetric[_, _]] @@ -52,7 +51,7 @@ class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { lookupMetric(name, unit, InstrumentTypes.Gauge)(new GaugeMetricImpl(name, unit, instrumentFactory)) def minMaxCounter(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange], sampleInterval: Option[Duration]): MinMaxCounterMetric = - lookupMetric(name, unit, InstrumentTypes.MinMaxCounter)(new MinMaxCounterMetricImpl(name, unit, dynamicRange, sampleInterval, instrumentFactory, registryExecutionContext)) + lookupMetric(name, unit, InstrumentTypes.MinMaxCounter)(new MinMaxCounterMetricImpl(name, unit, dynamicRange, sampleInterval, instrumentFactory, scheduler)) override def snapshot(): MetricsSnapshot = synchronized { diff --git a/kamon-core/src/main/scala/kamon/util/Registration.scala b/kamon-core/src/main/scala/kamon/util/Registration.scala new file mode 100644 index 00000000..80142ffb --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/Registration.scala @@ -0,0 +1,24 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +/** + * Implementations of this interface encapsulate whatever state is necessary to properly handle de-registration from + * the component that returned the registration. + */ +trait Registration { + def cancel(): Boolean +} |