aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2019-02-11 23:16:54 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2019-02-12 02:49:23 +0100
commit7152517f2586a5b40726365a756087ddddc099ca (patch)
tree26b4d9a1768dd0460a4cedb9fc4fde27750936a4 /kamon-core
parent4ce838b1af6257625b27ea38d55947912cba00c9 (diff)
downloadKamon-7152517f2586a5b40726365a756087ddddc099ca.tar.gz
Kamon-7152517f2586a5b40726365a756087ddddc099ca.tar.bz2
Kamon-7152517f2586a5b40726365a756087ddddc099ca.zip
self-review changes and use a thread pool for the embedded status page
server
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/resources/reference.conf8
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala66
-rw-r--r--kamon-core/src/main/scala/kamon/StatusPage.scala22
-rw-r--r--kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala38
-rw-r--r--kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala8
-rw-r--r--kamon-core/src/main/scala/kamon/status/Status.scala15
-rw-r--r--kamon-core/src/main/scala/kamon/status/StatusPageServer.scala83
7 files changed, 125 insertions, 115 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 37491c79..7a5576bc 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -27,14 +27,14 @@ kamon {
status {
- # When enabled, Kamon will start an embedded web server to publish the status mini-site that contains basic
- # status and debugging information.
+ # When enabled Kamon will start an embedded web server to publish the status page mini-site, which contains basic
+ # system information that can be used for debugging and troubleshooting issues with Kamon.
enabled = true
- # Controls the hostname and port in which the status mini-site HTTP server will be listening.
+ # Controls the hostname and port on which the status page embedded server will be listening.
listen {
hostname = "0.0.0.0"
- port = 9912
+ port = 5266
}
}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 284c7553..cfeea19e 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -15,10 +15,6 @@
package kamon
-import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
-import kamon.metric.PeriodSnapshot
-import kamon.trace.Span
-
object Kamon extends ClassLoading
with Configuration
with Utilities
@@ -36,64 +32,6 @@ object Kamon extends ClassLoading
_environment
onReconfigure(newConfig => {
- _environment = Environment.fromConfig(config)
- })
-}
-
-
-object QuickTest extends App {
- val manualConfig =
- """
- |kamon.modules {
- | kamon-zipkin {
- | enabled = false
- | description = "Module that sends data to particular places"
- | kind = metric
- | class = kamon.MyCustomMetricDude
- | }
- |}
- |
- |kamon.environment.tags {
- | one = test
- |}
- """.stripMargin
-
- val newConfig = ConfigFactory.parseString(manualConfig).withFallback(Kamon.config())
- Kamon.reconfigure(newConfig)
-
-
-
- Kamon.loadModules()
- Kamon.registerModule("my-module", new kamon.module.MetricReporter {
- override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {}
- override def start(): Unit = {}
- override def stop(): Unit = {}
- override def reconfigure(newConfig: Config): Unit = {}
- })
-
- Kamon.registerModule("my-module-for-spans", new kamon.module.SpanReporter {
- override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit = {}
- override def start(): Unit = {}
- override def stop(): Unit = {}
- override def reconfigure(newConfig: Config): Unit = {}
+ _environment = Environment.fromConfig(newConfig)
})
-
-
- Kamon.histogram("test").refine("actor_class" -> "com.kamon.something.MyActor", "system" -> "HRMS").record(10)
- Kamon.rangeSampler("test-rs").refine("actor_class" -> "com.kamon.something.MyActor", "system" -> "HRMS").increment(34)
- Kamon.counter("test-counter").refine("tagcito" -> "value").increment(42)
-
- //println("JSON CONFIG: " + Kamon.config().root().render(ConfigRenderOptions.concise().setFormatted(true).setJson(true)))
-
-
- Thread.sleep(100000000)
-
-
-}
-
-class MyCustomMetricDude extends kamon.module.MetricReporter {
- override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {}
- override def start(): Unit = {}
- override def stop(): Unit = {}
- override def reconfigure(newConfig: Config): Unit = {}
-}
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/StatusPage.scala b/kamon-core/src/main/scala/kamon/StatusPage.scala
index e5c8ef52..1ac60c1b 100644
--- a/kamon-core/src/main/scala/kamon/StatusPage.scala
+++ b/kamon-core/src/main/scala/kamon/StatusPage.scala
@@ -1,9 +1,13 @@
package kamon
import com.typesafe.config.Config
-import kamon.status.{StatusPageServer, Status}
+import kamon.status.{Status, StatusPageServer}
+import org.slf4j.LoggerFactory
+
+import scala.util.{Failure, Success, Try}
trait StatusPage { self: Configuration with ClassLoading with ModuleLoading with Metrics with Configuration =>
+ private val _log = LoggerFactory.getLogger(classOf[StatusPage])
@volatile private var _statusPageServer: Option[StatusPageServer] = None
private val _status = new Status(self._moduleRegistry, self._metricsRegistry, self)
@@ -47,10 +51,20 @@ trait StatusPage { self: Configuration with ClassLoading with ModuleLoading with
}
private def startServer(hostname: String, port: Int, resourceLoader: ClassLoader): Unit = {
- val server = new StatusPageServer(hostname, port, resourceLoader, _status)
- server.start()
+ Try {
+
+ val server = new StatusPageServer(hostname, port, resourceLoader, _status)
+ server.start()
+ server
- _statusPageServer = Some(server)
+ } match {
+ case Success(server) =>
+ _log.info(s"Status page started on http://$hostname:$port/")
+ _statusPageServer = Some(server)
+
+ case Failure(t) =>
+ _log.error("Failed to start the status page embedded server", t)
+ }
}
private def stopServer(): Unit = {
diff --git a/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala b/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala
index 81b94f29..d45cd80f 100644
--- a/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala
@@ -76,7 +76,7 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c
* configured modules state.
*/
def load(config: Config): Unit = synchronized {
- val configuredModules = readModuleSettings(config)
+ val configuredModules = readModuleSettings(config, true)
val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded }
// Start, reconfigure and stop modules that are still present but disabled.
@@ -233,7 +233,7 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c
_spanReporterModules.values
}
- private def readModuleSettings(config: Config): Seq[Module.Settings] = {
+ private def readModuleSettings(config: Config, emitConfigurationWarnings: Boolean): Seq[Module.Settings] = {
val moduleConfigs = config.getConfig("kamon.modules").configurations
val moduleSettings = moduleConfigs.map {
case (moduleName, moduleConfig) =>
@@ -253,11 +253,18 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c
})
- moduleSettings.failed.foreach { t =>
- _logger.warn(s"Failed to read configuration for module [$moduleName]", t)
+ if(emitConfigurationWarnings) {
+ moduleSettings.failed.foreach { t =>
+ _logger.warn(s"Failed to read configuration for module [$moduleName]", t)
- if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) {
- _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration")
+ val hasLegacySettings =
+ moduleConfig.hasPath("requires-aspectj") ||
+ moduleConfig.hasPath("auto-start") ||
+ moduleConfig.hasPath("extension-class")
+
+ if (hasLegacySettings) {
+ _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration")
+ }
}
}
@@ -281,7 +288,10 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c
Module.Settings(name, description, moduleClazz, inferredModuleKind, true)
}
- moduleSettings.failed.foreach(t => _logger.error(s"Failed to load legacy reporter module [${moduleClass}]", t))
+ if(emitConfigurationWarnings) {
+ moduleSettings.failed.foreach(t => _logger.error(s"Failed to load legacy reporter module [${moduleClass}]", t))
+ }
+
moduleSettings
})
.filter(_.isSuccess)
@@ -291,11 +301,13 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c
val (repeatedLegacyModules, uniqueLegacyModules) = legacyModuleSettings
.partition(lm => moduleSettings.find(_.clazz.getName == lm.clazz.getName).nonEmpty)
- repeatedLegacyModules.foreach(m =>
- _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting."))
+ if(emitConfigurationWarnings) {
+ repeatedLegacyModules.foreach(m =>
+ _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting."))
- uniqueLegacyModules.foreach(m =>
- _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules."))
+ uniqueLegacyModules.foreach(m =>
+ _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules."))
+ }
moduleSettings ++ uniqueLegacyModules
@@ -334,7 +346,7 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c
* Returns the current status of this module registry.
*/
private[kamon] def status(): Status.ModuleRegistry = {
- val automaticallyAddedModules = readModuleSettings(configuration.config()).map(moduleSettings => {
+ val automaticallyAddedModules = readModuleSettings(configuration.config(), false).map(moduleSettings => {
val isActive = _registeredModules.get(moduleSettings.name).nonEmpty
Status.Module(
@@ -342,7 +354,7 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c
moduleSettings.description,
moduleSettings.clazz.getCanonicalName,
moduleSettings.kind,
- isProgrammaticallyRegistered = false,
+ programmaticallyRegistered = false,
moduleSettings.enabled,
isActive)
})
diff --git a/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala b/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala
index 2291648c..5ab0eb9f 100644
--- a/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala
+++ b/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala
@@ -39,9 +39,9 @@ object JsonMarshalling {
.value("description", m.description)
.value("clazz", m.clazz)
.value("kind", moduleKindString(m.kind))
- .value("isProgrammaticallyRegistered", m.isProgrammaticallyRegistered)
- .value("enabled", m.isEnabled)
- .value("started", m.isStarted)
+ .value("programmaticallyRegistered", m.programmaticallyRegistered)
+ .value("enabled", m.enabled)
+ .value("started", m.started)
.end()
})
@@ -107,7 +107,7 @@ object JsonMarshalling {
override def toJson(instance: Status.Instrumentation, builder: JavaStringBuilder): Unit = {
val instrumentationObject = JsonWriter.on(builder)
.`object`()
- .value("isActive", instance.isIActive)
+ .value("active", instance.active)
.`object`("modules")
instance.modules.asScala.foreach {
diff --git a/kamon-core/src/main/scala/kamon/status/Status.scala b/kamon-core/src/main/scala/kamon/status/Status.scala
index 956e3594..ef5bb8eb 100644
--- a/kamon-core/src/main/scala/kamon/status/Status.scala
+++ b/kamon-core/src/main/scala/kamon/status/Status.scala
@@ -9,7 +9,8 @@ import kamon.module.Module.{Kind => ModuleKind}
import java.util.{Collections, List => JavaList, Map => JavaMap}
/**
- * Exposes Kamon components' status information. This is meant to be used for informational and debugging purposes.
+ * Exposes Kamon components' status information. This is meant to be used for informational and debugging purposes and
+ * by no means should replace the use of reporters to extract information from Kamon.
*/
class Status(_moduleRegistry: ModuleRegistry, _metricRegistry: MetricRegistry, configuration: Configuration) {
@@ -20,8 +21,8 @@ class Status(_moduleRegistry: ModuleRegistry, _metricRegistry: MetricRegistry, c
Status.Settings(BuildInfo.version, Kamon.environment, configuration.config())
/**
- * Status of the module registry. Describes what modules have been detected in the classpath and their current
- * statuses.
+ * Status of the module registry. Describes what modules have been detected and registered, either from the classpath
+ * or programatically and their current status.
*/
def moduleRegistry(): Status.ModuleRegistry =
_moduleRegistry.status()
@@ -72,9 +73,9 @@ object Status {
description: String,
clazz: String,
kind: ModuleKind,
- isProgrammaticallyRegistered: Boolean,
- isEnabled: Boolean,
- isStarted: Boolean
+ programmaticallyRegistered: Boolean,
+ enabled: Boolean,
+ started: Boolean
)
case class MetricRegistry(
@@ -94,7 +95,7 @@ object Status {
* outside Kamon.
*/
private[kamon] case class Instrumentation(
- isIActive: Boolean,
+ active: Boolean,
modules: JavaMap[String, String],
errors: JavaMap[String, JavaList[Throwable]]
)
diff --git a/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala b/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala
index 2784b87a..b2c7ff74 100644
--- a/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala
+++ b/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala
@@ -1,15 +1,19 @@
package kamon.status
+import java.io.InputStream
+
import fi.iki.elonen.NanoHTTPD
-import fi.iki.elonen.NanoHTTPD.Response
import fi.iki.elonen.NanoHTTPD.Response.{Status => StatusCode}
+import java.util.Collections
+import java.util.concurrent.{ExecutorService, Executors}
+
+import scala.collection.JavaConverters.asScalaBufferConverter
class StatusPageServer(hostname: String, port: Int, resourceLoader: ClassLoader, status: Status)
extends NanoHTTPD(hostname, port) {
private val RootResourceDirectory = "status"
- private val ResourceExtensionRegex = ".*\\.([a-zA-Z]*)".r
-
+ private val ResourceExtensionRegex = ".*\\.([a-zA-Z0-9]*)".r
override def serve(session: NanoHTTPD.IHTTPSession): NanoHTTPD.Response = {
if(session.getMethod() == NanoHTTPD.Method.GET) {
@@ -25,44 +29,85 @@ class StatusPageServer(hostname: String, port: Int, resourceLoader: ClassLoader,
}
} else {
+
// Serve resources from the status page folder.
- val resource = if (session.getUri() == "/") "/index.html" else session.getUri()
- val resourcePath = RootResourceDirectory + resource
+ val requestedResource = if (session.getUri() == "/") "/index.html" else session.getUri()
+ val resourcePath = RootResourceDirectory + requestedResource
val resourceStream = resourceLoader.getResourceAsStream(resourcePath)
- if (resourceStream == null) NotFound else {
- NanoHTTPD.newChunkedResponse(StatusCode.OK, mimeType(resource), resourceStream)
- }
+ if (resourceStream == null) NotFound else resource(requestedResource, resourceStream)
}
} else NotAllowed
}
+ override def start(): Unit = {
+ setAsyncRunner(new ThreadPoolRunner(Executors.newFixedThreadPool(2)))
+ start(NanoHTTPD.SOCKET_READ_TIMEOUT, false)
+ }
+
private def mimeType(resource: String): String = {
val ResourceExtensionRegex(resourceExtension) = resource
resourceExtension match {
- case "css" => "text/css"
- case "js" => "application/javascript"
- case "ico" => "image/x-icon"
- case "svg" => "image/svg+xml"
- case "html" => "text/html"
- case _ => "text/plain"
+ case "css" => "text/css"
+ case "js" => "application/javascript"
+ case "ico" => "image/x-icon"
+ case "svg" => "image/svg+xml"
+ case "html" => "text/html"
+ case "woff2" => "font/woff2"
+ case _ => "text/plain"
}
}
- private def json[T : JsonMarshalling](instance: T): Response = {
+ private def json[T](instance: T)(implicit marshalling: JsonMarshalling[T]) = {
val builder = new java.lang.StringBuilder()
- implicitly[JsonMarshalling[T]].toJson(instance, builder)
- NanoHTTPD.newFixedLengthResponse(StatusCode.OK, "application/json", builder.toString())
+ marshalling.toJson(instance, builder)
+
+ val response = NanoHTTPD.newFixedLengthResponse(StatusCode.OK, "application/json", builder.toString())
+ response.closeConnection(true)
+ response
+ }
+
+ private def resource(name: String, stream: InputStream) = {
+ val response = NanoHTTPD.newChunkedResponse(StatusCode.OK, mimeType(name), stream)
+ response.closeConnection(true)
+ response
}
private val NotAllowed = NanoHTTPD.newFixedLengthResponse(
StatusCode.METHOD_NOT_ALLOWED,
NanoHTTPD.MIME_PLAINTEXT,
- "Only GET requests are allowed.")
+ "Only GET requests are allowed."
+ )
private val NotFound = NanoHTTPD.newFixedLengthResponse(
StatusCode.NOT_FOUND,
NanoHTTPD.MIME_PLAINTEXT,
- "The requested resource was not found.")
+ "The requested resource was not found."
+ )
+
+ // Closing the connections will ensure that the thread pool will not be exhausted by keep alive
+ // connections from the browsers.
+ NotAllowed.closeConnection(true)
+ NotFound.closeConnection(true)
+
+
+ /**
+ * AsyncRunner that uses a thread pool for handling requests rather than spawning a new thread for each request (as
+ * the default runner does).
+ */
+ private class ThreadPoolRunner(executorService: ExecutorService) extends NanoHTTPD.AsyncRunner {
+ final private val _openRequests = Collections.synchronizedList(new java.util.LinkedList[NanoHTTPD#ClientHandler]())
+
+ override def closeAll(): Unit =
+ _openRequests.asScala.foreach(_.close())
+
+ override def closed(clientHandler: NanoHTTPD#ClientHandler): Unit =
+ _openRequests.remove(clientHandler)
+
+ override def exec(clientHandler: NanoHTTPD#ClientHandler): Unit = {
+ executorService.submit(clientHandler)
+ _openRequests.add(clientHandler)
+ }
+ }
} \ No newline at end of file