aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2018-01-30 15:29:33 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2018-01-30 15:41:54 +0100
commit083c31cb0eb18dce4f2a46d52b3606a92128230b (patch)
treea2e263b111b279128c1802c5333970f58656699e
parent4ab94c41bf5bd157d5ff1639ca5635ac5d98a3e4 (diff)
downloadKamon-083c31cb0eb18dce4f2a46d52b3606a92128230b.tar.gz
Kamon-083c31cb0eb18dce4f2a46d52b3606a92128230b.tar.bz2
Kamon-083c31cb0eb18dce4f2a46d52b3606a92128230b.zip
turn all Kamon threads into daemon threads, except for reporters, fixes #502
-rw-r--r--build.sbt5
-rw-r--r--kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala67
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/package.scala6
5 files changed, 77 insertions, 5 deletions
diff --git a/build.sbt b/build.sbt
index 583f8a16..51a5a764 100644
--- a/build.sbt
+++ b/build.sbt
@@ -50,7 +50,10 @@ lazy val testkit = (project in file("kamon-testkit"))
lazy val coreTests = (project in file("kamon-core-tests"))
- .settings(moduleName := "kamon-core-tests", resolvers += Resolver.mavenLocal)
+ .settings(
+ moduleName := "kamon-core-tests",
+ resolvers += Resolver.mavenLocal,
+ fork in Test := true)
.settings(noPublishing: _*)
.settings(commonSettings: _*)
.settings(
diff --git a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
new file mode 100644
index 00000000..b5d0425b
--- /dev/null
+++ b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
@@ -0,0 +1,67 @@
+package kamon
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+import com.typesafe.config.Config
+import kamon.metric.PeriodSnapshot
+import kamon.trace.Span
+import org.scalatest.{Matchers, WordSpec}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.SpanSugar._
+
+class KamonLifecycleSpec extends WordSpec with Matchers with Eventually{
+
+ "the Kamon lifecycle" should {
+ "keep the JVM running if reporters are running" in {
+ val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithRunningReporter"))
+ Thread.sleep(5000)
+ process.isAlive shouldBe true
+ process.destroyForcibly().waitFor(5, TimeUnit.SECONDS)
+ }
+
+ "let the JVM stop after all reporters are stopped" in {
+ val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithTemporaryReporter"))
+ Thread.sleep(2000)
+ process.isAlive shouldBe true
+
+ eventually(timeout(7 seconds)) {
+ process.isAlive shouldBe false
+ process.exitValue() shouldBe 0
+ }
+ }
+ }
+
+
+ def createProcessCommand(mainClass: String): String = {
+ System.getProperty("java.home") + File.separator + "bin" + File.separator + "java" +
+ " -cp " + System.getProperty("java.class.path") + " " + mainClass
+ }
+}
+
+class DummyMetricReporter extends MetricReporter {
+ override def start(): Unit = {}
+ override def stop(): Unit = {}
+ override def reconfigure(config: Config): Unit = {}
+ override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {}
+}
+
+class DummySpanReporter extends SpanReporter {
+ override def start(): Unit = {}
+ override def stop(): Unit = {}
+ override def reconfigure(config: Config): Unit = {}
+ override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit = {}
+}
+
+object KamonWithRunningReporter extends App {
+ Kamon.addReporter(new DummyMetricReporter())
+ Kamon.addReporter(new DummySpanReporter())
+}
+
+object KamonWithTemporaryReporter extends App {
+ Kamon.addReporter(new DummyMetricReporter())
+ Kamon.addReporter(new DummySpanReporter())
+
+ Thread.sleep(5000)
+ Kamon.stopAllReporters()
+}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 562ef615..8f69ab41 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -37,7 +37,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
@volatile private var _filters = Filters.fromConfig(_config)
private val _clock = new Clock.Default()
- private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler"))
+ private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler", daemon = true))
private val _metrics = new MetricRegistry(_config, _scheduler)
private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock)
private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock)
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index bd28e871..92465de8 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -67,7 +67,7 @@ object ReporterRegistry {
private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config, clock: Clock) extends ReporterRegistry with SpanSink {
private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry])
- private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry"))
+ private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry", daemon = true))
private val reporterCounter = new AtomicLong(0L)
private var registryConfiguration = readRegistryConfiguration(initialConfig)
diff --git a/kamon-core/src/main/scala/kamon/package.scala b/kamon-core/src/main/scala/kamon/package.scala
index c746efa1..d3b25500 100644
--- a/kamon-core/src/main/scala/kamon/package.scala
+++ b/kamon-core/src/main/scala/kamon/package.scala
@@ -30,18 +30,19 @@ package object kamon {
/**
* Creates a thread factory that assigns the specified name to all created Threads.
*/
- def threadFactory(name: String): ThreadFactory =
+ def threadFactory(name: String, daemon: Boolean = false): ThreadFactory =
new ThreadFactory {
val defaultFactory = Executors.defaultThreadFactory()
override def newThread(r: Runnable): Thread = {
val thread = defaultFactory.newThread(r)
thread.setName(name)
+ thread.setDaemon(daemon)
thread
}
}
- def numberedThreadFactory(name: String): ThreadFactory =
+ def numberedThreadFactory(name: String, daemon: Boolean = false): ThreadFactory =
new ThreadFactory {
val count = new AtomicLong()
val defaultFactory = Executors.defaultThreadFactory()
@@ -49,6 +50,7 @@ package object kamon {
override def newThread(r: Runnable): Thread = {
val thread = defaultFactory.newThread(r)
thread.setName(name + "-" + count.incrementAndGet().toString)
+ thread.setDaemon(daemon)
thread
}
}