diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2019-02-11 23:16:54 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2019-02-12 02:49:23 +0100 |
commit | 7152517f2586a5b40726365a756087ddddc099ca (patch) | |
tree | 26b4d9a1768dd0460a4cedb9fc4fde27750936a4 /kamon-core | |
parent | 4ce838b1af6257625b27ea38d55947912cba00c9 (diff) | |
download | Kamon-7152517f2586a5b40726365a756087ddddc099ca.tar.gz Kamon-7152517f2586a5b40726365a756087ddddc099ca.tar.bz2 Kamon-7152517f2586a5b40726365a756087ddddc099ca.zip |
self-review changes and use a thread pool for the embedded status page
server
Diffstat (limited to 'kamon-core')
7 files changed, 125 insertions, 115 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 37491c79..7a5576bc 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -27,14 +27,14 @@ kamon { status { - # When enabled, Kamon will start an embedded web server to publish the status mini-site that contains basic - # status and debugging information. + # When enabled Kamon will start an embedded web server to publish the status page mini-site, which contains basic + # system information that can be used for debugging and troubleshooting issues with Kamon. enabled = true - # Controls the hostname and port in which the status mini-site HTTP server will be listening. + # Controls the hostname and port on which the status page embedded server will be listening. listen { hostname = "0.0.0.0" - port = 9912 + port = 5266 } } diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 284c7553..cfeea19e 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -15,10 +15,6 @@ package kamon -import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions} -import kamon.metric.PeriodSnapshot -import kamon.trace.Span - object Kamon extends ClassLoading with Configuration with Utilities @@ -36,64 +32,6 @@ object Kamon extends ClassLoading _environment onReconfigure(newConfig => { - _environment = Environment.fromConfig(config) - }) -} - - -object QuickTest extends App { - val manualConfig = - """ - |kamon.modules { - | kamon-zipkin { - | enabled = false - | description = "Module that sends data to particular places" - | kind = metric - | class = kamon.MyCustomMetricDude - | } - |} - | - |kamon.environment.tags { - | one = test - |} - """.stripMargin - - val newConfig = ConfigFactory.parseString(manualConfig).withFallback(Kamon.config()) - Kamon.reconfigure(newConfig) - - - - Kamon.loadModules() - Kamon.registerModule("my-module", new kamon.module.MetricReporter { - override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {} - override def start(): Unit = {} - override def stop(): Unit = {} - override def reconfigure(newConfig: Config): Unit = {} - }) - - Kamon.registerModule("my-module-for-spans", new kamon.module.SpanReporter { - override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit = {} - override def start(): Unit = {} - override def stop(): Unit = {} - override def reconfigure(newConfig: Config): Unit = {} + _environment = Environment.fromConfig(newConfig) }) - - - Kamon.histogram("test").refine("actor_class" -> "com.kamon.something.MyActor", "system" -> "HRMS").record(10) - Kamon.rangeSampler("test-rs").refine("actor_class" -> "com.kamon.something.MyActor", "system" -> "HRMS").increment(34) - Kamon.counter("test-counter").refine("tagcito" -> "value").increment(42) - - //println("JSON CONFIG: " + Kamon.config().root().render(ConfigRenderOptions.concise().setFormatted(true).setJson(true))) - - - Thread.sleep(100000000) - - -} - -class MyCustomMetricDude extends kamon.module.MetricReporter { - override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {} - override def start(): Unit = {} - override def stop(): Unit = {} - override def reconfigure(newConfig: Config): Unit = {} -} +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/StatusPage.scala b/kamon-core/src/main/scala/kamon/StatusPage.scala index e5c8ef52..1ac60c1b 100644 --- a/kamon-core/src/main/scala/kamon/StatusPage.scala +++ b/kamon-core/src/main/scala/kamon/StatusPage.scala @@ -1,9 +1,13 @@ package kamon import com.typesafe.config.Config -import kamon.status.{StatusPageServer, Status} +import kamon.status.{Status, StatusPageServer} +import org.slf4j.LoggerFactory + +import scala.util.{Failure, Success, Try} trait StatusPage { self: Configuration with ClassLoading with ModuleLoading with Metrics with Configuration => + private val _log = LoggerFactory.getLogger(classOf[StatusPage]) @volatile private var _statusPageServer: Option[StatusPageServer] = None private val _status = new Status(self._moduleRegistry, self._metricsRegistry, self) @@ -47,10 +51,20 @@ trait StatusPage { self: Configuration with ClassLoading with ModuleLoading with } private def startServer(hostname: String, port: Int, resourceLoader: ClassLoader): Unit = { - val server = new StatusPageServer(hostname, port, resourceLoader, _status) - server.start() + Try { + + val server = new StatusPageServer(hostname, port, resourceLoader, _status) + server.start() + server - _statusPageServer = Some(server) + } match { + case Success(server) => + _log.info(s"Status page started on http://$hostname:$port/") + _statusPageServer = Some(server) + + case Failure(t) => + _log.error("Failed to start the status page embedded server", t) + } } private def stopServer(): Unit = { diff --git a/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala b/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala index 81b94f29..d45cd80f 100644 --- a/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala +++ b/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala @@ -76,7 +76,7 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c * configured modules state. */ def load(config: Config): Unit = synchronized { - val configuredModules = readModuleSettings(config) + val configuredModules = readModuleSettings(config, true) val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded } // Start, reconfigure and stop modules that are still present but disabled. @@ -233,7 +233,7 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c _spanReporterModules.values } - private def readModuleSettings(config: Config): Seq[Module.Settings] = { + private def readModuleSettings(config: Config, emitConfigurationWarnings: Boolean): Seq[Module.Settings] = { val moduleConfigs = config.getConfig("kamon.modules").configurations val moduleSettings = moduleConfigs.map { case (moduleName, moduleConfig) => @@ -253,11 +253,18 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c }) - moduleSettings.failed.foreach { t => - _logger.warn(s"Failed to read configuration for module [$moduleName]", t) + if(emitConfigurationWarnings) { + moduleSettings.failed.foreach { t => + _logger.warn(s"Failed to read configuration for module [$moduleName]", t) - if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) { - _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration") + val hasLegacySettings = + moduleConfig.hasPath("requires-aspectj") || + moduleConfig.hasPath("auto-start") || + moduleConfig.hasPath("extension-class") + + if (hasLegacySettings) { + _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration") + } } } @@ -281,7 +288,10 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c Module.Settings(name, description, moduleClazz, inferredModuleKind, true) } - moduleSettings.failed.foreach(t => _logger.error(s"Failed to load legacy reporter module [${moduleClass}]", t)) + if(emitConfigurationWarnings) { + moduleSettings.failed.foreach(t => _logger.error(s"Failed to load legacy reporter module [${moduleClass}]", t)) + } + moduleSettings }) .filter(_.isSuccess) @@ -291,11 +301,13 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c val (repeatedLegacyModules, uniqueLegacyModules) = legacyModuleSettings .partition(lm => moduleSettings.find(_.clazz.getName == lm.clazz.getName).nonEmpty) - repeatedLegacyModules.foreach(m => - _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting.")) + if(emitConfigurationWarnings) { + repeatedLegacyModules.foreach(m => + _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting.")) - uniqueLegacyModules.foreach(m => - _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules.")) + uniqueLegacyModules.foreach(m => + _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules.")) + } moduleSettings ++ uniqueLegacyModules @@ -334,7 +346,7 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c * Returns the current status of this module registry. */ private[kamon] def status(): Status.ModuleRegistry = { - val automaticallyAddedModules = readModuleSettings(configuration.config()).map(moduleSettings => { + val automaticallyAddedModules = readModuleSettings(configuration.config(), false).map(moduleSettings => { val isActive = _registeredModules.get(moduleSettings.name).nonEmpty Status.Module( @@ -342,7 +354,7 @@ class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, c moduleSettings.description, moduleSettings.clazz.getCanonicalName, moduleSettings.kind, - isProgrammaticallyRegistered = false, + programmaticallyRegistered = false, moduleSettings.enabled, isActive) }) diff --git a/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala b/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala index 2291648c..5ab0eb9f 100644 --- a/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala +++ b/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala @@ -39,9 +39,9 @@ object JsonMarshalling { .value("description", m.description) .value("clazz", m.clazz) .value("kind", moduleKindString(m.kind)) - .value("isProgrammaticallyRegistered", m.isProgrammaticallyRegistered) - .value("enabled", m.isEnabled) - .value("started", m.isStarted) + .value("programmaticallyRegistered", m.programmaticallyRegistered) + .value("enabled", m.enabled) + .value("started", m.started) .end() }) @@ -107,7 +107,7 @@ object JsonMarshalling { override def toJson(instance: Status.Instrumentation, builder: JavaStringBuilder): Unit = { val instrumentationObject = JsonWriter.on(builder) .`object`() - .value("isActive", instance.isIActive) + .value("active", instance.active) .`object`("modules") instance.modules.asScala.foreach { diff --git a/kamon-core/src/main/scala/kamon/status/Status.scala b/kamon-core/src/main/scala/kamon/status/Status.scala index 956e3594..ef5bb8eb 100644 --- a/kamon-core/src/main/scala/kamon/status/Status.scala +++ b/kamon-core/src/main/scala/kamon/status/Status.scala @@ -9,7 +9,8 @@ import kamon.module.Module.{Kind => ModuleKind} import java.util.{Collections, List => JavaList, Map => JavaMap} /** - * Exposes Kamon components' status information. This is meant to be used for informational and debugging purposes. + * Exposes Kamon components' status information. This is meant to be used for informational and debugging purposes and + * by no means should replace the use of reporters to extract information from Kamon. */ class Status(_moduleRegistry: ModuleRegistry, _metricRegistry: MetricRegistry, configuration: Configuration) { @@ -20,8 +21,8 @@ class Status(_moduleRegistry: ModuleRegistry, _metricRegistry: MetricRegistry, c Status.Settings(BuildInfo.version, Kamon.environment, configuration.config()) /** - * Status of the module registry. Describes what modules have been detected in the classpath and their current - * statuses. + * Status of the module registry. Describes what modules have been detected and registered, either from the classpath + * or programatically and their current status. */ def moduleRegistry(): Status.ModuleRegistry = _moduleRegistry.status() @@ -72,9 +73,9 @@ object Status { description: String, clazz: String, kind: ModuleKind, - isProgrammaticallyRegistered: Boolean, - isEnabled: Boolean, - isStarted: Boolean + programmaticallyRegistered: Boolean, + enabled: Boolean, + started: Boolean ) case class MetricRegistry( @@ -94,7 +95,7 @@ object Status { * outside Kamon. */ private[kamon] case class Instrumentation( - isIActive: Boolean, + active: Boolean, modules: JavaMap[String, String], errors: JavaMap[String, JavaList[Throwable]] ) diff --git a/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala b/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala index 2784b87a..b2c7ff74 100644 --- a/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala +++ b/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala @@ -1,15 +1,19 @@ package kamon.status +import java.io.InputStream + import fi.iki.elonen.NanoHTTPD -import fi.iki.elonen.NanoHTTPD.Response import fi.iki.elonen.NanoHTTPD.Response.{Status => StatusCode} +import java.util.Collections +import java.util.concurrent.{ExecutorService, Executors} + +import scala.collection.JavaConverters.asScalaBufferConverter class StatusPageServer(hostname: String, port: Int, resourceLoader: ClassLoader, status: Status) extends NanoHTTPD(hostname, port) { private val RootResourceDirectory = "status" - private val ResourceExtensionRegex = ".*\\.([a-zA-Z]*)".r - + private val ResourceExtensionRegex = ".*\\.([a-zA-Z0-9]*)".r override def serve(session: NanoHTTPD.IHTTPSession): NanoHTTPD.Response = { if(session.getMethod() == NanoHTTPD.Method.GET) { @@ -25,44 +29,85 @@ class StatusPageServer(hostname: String, port: Int, resourceLoader: ClassLoader, } } else { + // Serve resources from the status page folder. - val resource = if (session.getUri() == "/") "/index.html" else session.getUri() - val resourcePath = RootResourceDirectory + resource + val requestedResource = if (session.getUri() == "/") "/index.html" else session.getUri() + val resourcePath = RootResourceDirectory + requestedResource val resourceStream = resourceLoader.getResourceAsStream(resourcePath) - if (resourceStream == null) NotFound else { - NanoHTTPD.newChunkedResponse(StatusCode.OK, mimeType(resource), resourceStream) - } + if (resourceStream == null) NotFound else resource(requestedResource, resourceStream) } } else NotAllowed } + override def start(): Unit = { + setAsyncRunner(new ThreadPoolRunner(Executors.newFixedThreadPool(2))) + start(NanoHTTPD.SOCKET_READ_TIMEOUT, false) + } + private def mimeType(resource: String): String = { val ResourceExtensionRegex(resourceExtension) = resource resourceExtension match { - case "css" => "text/css" - case "js" => "application/javascript" - case "ico" => "image/x-icon" - case "svg" => "image/svg+xml" - case "html" => "text/html" - case _ => "text/plain" + case "css" => "text/css" + case "js" => "application/javascript" + case "ico" => "image/x-icon" + case "svg" => "image/svg+xml" + case "html" => "text/html" + case "woff2" => "font/woff2" + case _ => "text/plain" } } - private def json[T : JsonMarshalling](instance: T): Response = { + private def json[T](instance: T)(implicit marshalling: JsonMarshalling[T]) = { val builder = new java.lang.StringBuilder() - implicitly[JsonMarshalling[T]].toJson(instance, builder) - NanoHTTPD.newFixedLengthResponse(StatusCode.OK, "application/json", builder.toString()) + marshalling.toJson(instance, builder) + + val response = NanoHTTPD.newFixedLengthResponse(StatusCode.OK, "application/json", builder.toString()) + response.closeConnection(true) + response + } + + private def resource(name: String, stream: InputStream) = { + val response = NanoHTTPD.newChunkedResponse(StatusCode.OK, mimeType(name), stream) + response.closeConnection(true) + response } private val NotAllowed = NanoHTTPD.newFixedLengthResponse( StatusCode.METHOD_NOT_ALLOWED, NanoHTTPD.MIME_PLAINTEXT, - "Only GET requests are allowed.") + "Only GET requests are allowed." + ) private val NotFound = NanoHTTPD.newFixedLengthResponse( StatusCode.NOT_FOUND, NanoHTTPD.MIME_PLAINTEXT, - "The requested resource was not found.") + "The requested resource was not found." + ) + + // Closing the connections will ensure that the thread pool will not be exhausted by keep alive + // connections from the browsers. + NotAllowed.closeConnection(true) + NotFound.closeConnection(true) + + + /** + * AsyncRunner that uses a thread pool for handling requests rather than spawning a new thread for each request (as + * the default runner does). + */ + private class ThreadPoolRunner(executorService: ExecutorService) extends NanoHTTPD.AsyncRunner { + final private val _openRequests = Collections.synchronizedList(new java.util.LinkedList[NanoHTTPD#ClientHandler]()) + + override def closeAll(): Unit = + _openRequests.asScala.foreach(_.close()) + + override def closed(clientHandler: NanoHTTPD#ClientHandler): Unit = + _openRequests.remove(clientHandler) + + override def exec(clientHandler: NanoHTTPD#ClientHandler): Unit = { + executorService.submit(clientHandler) + _openRequests.add(clientHandler) + } + } }
\ No newline at end of file |