From 083c31cb0eb18dce4f2a46d52b3606a92128230b Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Tue, 30 Jan 2018 15:29:33 +0100 Subject: turn all Kamon threads into daemon threads, except for reporters, fixes #502 --- build.sbt | 5 +- .../src/test/scala/kamon/KamonLifecycleSpec.scala | 67 ++++++++++++++++++++++ kamon-core/src/main/scala/kamon/Kamon.scala | 2 +- .../src/main/scala/kamon/ReporterRegistry.scala | 2 +- kamon-core/src/main/scala/kamon/package.scala | 6 +- 5 files changed, 77 insertions(+), 5 deletions(-) create mode 100644 kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala diff --git a/build.sbt b/build.sbt index 583f8a16..51a5a764 100644 --- a/build.sbt +++ b/build.sbt @@ -50,7 +50,10 @@ lazy val testkit = (project in file("kamon-testkit")) lazy val coreTests = (project in file("kamon-core-tests")) - .settings(moduleName := "kamon-core-tests", resolvers += Resolver.mavenLocal) + .settings( + moduleName := "kamon-core-tests", + resolvers += Resolver.mavenLocal, + fork in Test := true) .settings(noPublishing: _*) .settings(commonSettings: _*) .settings( diff --git a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala new file mode 100644 index 00000000..b5d0425b --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala @@ -0,0 +1,67 @@ +package kamon + +import java.io.File +import java.util.concurrent.TimeUnit + +import com.typesafe.config.Config +import kamon.metric.PeriodSnapshot +import kamon.trace.Span +import org.scalatest.{Matchers, WordSpec} +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ + +class KamonLifecycleSpec extends WordSpec with Matchers with Eventually{ + + "the Kamon lifecycle" should { + "keep the JVM running if reporters are running" in { + val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithRunningReporter")) + Thread.sleep(5000) + process.isAlive shouldBe true + process.destroyForcibly().waitFor(5, TimeUnit.SECONDS) + } + + "let the JVM stop after all reporters are stopped" in { + val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithTemporaryReporter")) + Thread.sleep(2000) + process.isAlive shouldBe true + + eventually(timeout(7 seconds)) { + process.isAlive shouldBe false + process.exitValue() shouldBe 0 + } + } + } + + + def createProcessCommand(mainClass: String): String = { + System.getProperty("java.home") + File.separator + "bin" + File.separator + "java" + + " -cp " + System.getProperty("java.class.path") + " " + mainClass + } +} + +class DummyMetricReporter extends MetricReporter { + override def start(): Unit = {} + override def stop(): Unit = {} + override def reconfigure(config: Config): Unit = {} + override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {} +} + +class DummySpanReporter extends SpanReporter { + override def start(): Unit = {} + override def stop(): Unit = {} + override def reconfigure(config: Config): Unit = {} + override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit = {} +} + +object KamonWithRunningReporter extends App { + Kamon.addReporter(new DummyMetricReporter()) + Kamon.addReporter(new DummySpanReporter()) +} + +object KamonWithTemporaryReporter extends App { + Kamon.addReporter(new DummyMetricReporter()) + Kamon.addReporter(new DummySpanReporter()) + + Thread.sleep(5000) + Kamon.stopAllReporters() +} diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 562ef615..8f69ab41 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -37,7 +37,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { @volatile private var _filters = Filters.fromConfig(_config) private val _clock = new Clock.Default() - private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler")) + private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler", daemon = true)) private val _metrics = new MetricRegistry(_config, _scheduler) private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock) private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock) diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index bd28e871..92465de8 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -67,7 +67,7 @@ object ReporterRegistry { private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config, clock: Clock) extends ReporterRegistry with SpanSink { private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) - private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) + private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry", daemon = true)) private val reporterCounter = new AtomicLong(0L) private var registryConfiguration = readRegistryConfiguration(initialConfig) diff --git a/kamon-core/src/main/scala/kamon/package.scala b/kamon-core/src/main/scala/kamon/package.scala index c746efa1..d3b25500 100644 --- a/kamon-core/src/main/scala/kamon/package.scala +++ b/kamon-core/src/main/scala/kamon/package.scala @@ -30,18 +30,19 @@ package object kamon { /** * Creates a thread factory that assigns the specified name to all created Threads. */ - def threadFactory(name: String): ThreadFactory = + def threadFactory(name: String, daemon: Boolean = false): ThreadFactory = new ThreadFactory { val defaultFactory = Executors.defaultThreadFactory() override def newThread(r: Runnable): Thread = { val thread = defaultFactory.newThread(r) thread.setName(name) + thread.setDaemon(daemon) thread } } - def numberedThreadFactory(name: String): ThreadFactory = + def numberedThreadFactory(name: String, daemon: Boolean = false): ThreadFactory = new ThreadFactory { val count = new AtomicLong() val defaultFactory = Executors.defaultThreadFactory() @@ -49,6 +50,7 @@ package object kamon { override def newThread(r: Runnable): Thread = { val thread = defaultFactory.newThread(r) thread.setName(name + "-" + count.incrementAndGet().toString) + thread.setDaemon(daemon) thread } } -- cgit v1.2.3