aboutsummaryrefslogtreecommitdiff
path: root/kamon-status-page/src/main/scala/kamon
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-status-page/src/main/scala/kamon')
-rw-r--r--kamon-status-page/src/main/scala/kamon/status/page/JsonMarshalling.scala135
-rw-r--r--kamon-status-page/src/main/scala/kamon/status/page/StatusPage.scala69
-rw-r--r--kamon-status-page/src/main/scala/kamon/status/page/StatusPageServer.scala117
3 files changed, 321 insertions, 0 deletions
diff --git a/kamon-status-page/src/main/scala/kamon/status/page/JsonMarshalling.scala b/kamon-status-page/src/main/scala/kamon/status/page/JsonMarshalling.scala
new file mode 100644
index 00000000..0a66fcac
--- /dev/null
+++ b/kamon-status-page/src/main/scala/kamon/status/page/JsonMarshalling.scala
@@ -0,0 +1,135 @@
+package kamon.status.page
+
+import java.lang.{StringBuilder => JavaStringBuilder}
+
+import com.grack.nanojson.JsonWriter
+import com.typesafe.config.ConfigRenderOptions
+import kamon.module.Module
+import kamon.status.Status
+
+import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, mapAsScalaMapConverter}
+
+
+trait JsonMarshalling[T] {
+
+ /**
+ * Implementations should append a Json object or array that describes the given instance members and any
+ * additional information that is expected to be shown in the status mini site.
+ */
+ def toJson(instance: T, builder: JavaStringBuilder): Unit
+}
+
+object JsonMarshalling {
+
+ implicit object ModuleRegistryStatusJsonMarshalling extends JsonMarshalling[Status.ModuleRegistry] {
+ override def toJson(instance: Status.ModuleRegistry, builder: JavaStringBuilder): Unit = {
+ def moduleKindString(moduleKind: Module.Kind): String = moduleKind match {
+ case Module.Kind.Combined => "combined"
+ case Module.Kind.Metric => "metric"
+ case Module.Kind.Span => "span"
+ case Module.Kind.Plain => "plain"
+ }
+
+ val array = JsonWriter.on(builder)
+ .`object`()
+ .array("modules")
+
+ instance.modules.foreach(m => {
+ array.`object`()
+ .value("name", m.name)
+ .value("description", m.description)
+ .value("clazz", m.clazz)
+ .value("kind", moduleKindString(m.kind))
+ .value("programmaticallyRegistered", m.programmaticallyRegistered)
+ .value("enabled", m.enabled)
+ .value("started", m.started)
+ .end()
+ })
+
+ array.end().end().done()
+ }
+ }
+
+ implicit object BaseInfoJsonMarshalling extends JsonMarshalling[Status.Settings] {
+ override def toJson(instance: Status.Settings, builder: JavaStringBuilder): Unit = {
+ val baseConfigJson = JsonWriter.on(builder)
+ .`object`()
+ .value("version", instance.version)
+ .value("config", instance.config.root().render(ConfigRenderOptions.concise()))
+
+ baseConfigJson.`object`("environment")
+ .value("service", instance.environment.service)
+ .value("host", instance.environment.host)
+ .value("instance", instance.environment.instance)
+ .`object`("tags")
+
+ instance.environment.tags.foreach {
+ case (key, value) => baseConfigJson.value(key, value)
+ }
+
+ baseConfigJson
+ .end() // ends tags
+ .end() // ends environment
+ .end() // ends base config
+ .done()
+ }
+ }
+
+ implicit object MetricRegistryStatusJsonMarshalling extends JsonMarshalling[Status.MetricRegistry] {
+ override def toJson(instance: Status.MetricRegistry, builder: JavaStringBuilder): Unit = {
+ val metricsObject = JsonWriter.on(builder)
+ .`object`
+ .array("metrics")
+
+ instance.metrics.foreach(metric => {
+ metricsObject
+ .`object`()
+ .value("name", metric.name)
+ .value("type", metric.instrumentType.name)
+ .value("unitDimension", metric.unit.dimension.name)
+ .value("unitMagnitude", metric.unit.magnitude.name)
+ .`object`("tags")
+
+ metric.tags.foreach { case (tag, value) => metricsObject.value(tag, value) }
+
+ metricsObject
+ .end() // tags
+ .end() // metric info
+ })
+
+ metricsObject
+ .end() // metrics array
+ .end() // object
+ .done()
+ }
+ }
+
+ implicit object InstrumentationStatusJsonMarshalling extends JsonMarshalling[Status.Instrumentation] {
+ override def toJson(instance: Status.Instrumentation, builder: JavaStringBuilder): Unit = {
+ val instrumentationObject = JsonWriter.on(builder)
+ .`object`()
+ .value("active", instance.active)
+ .`object`("modules")
+
+ instance.modules.asScala.foreach {
+ case (moduleName, moduleDescription) => instrumentationObject.value(moduleName, moduleDescription)
+ }
+
+ instrumentationObject
+ .end() // end modules
+ .`object`("errors")
+
+ instance.errors.asScala.foreach {
+ case (moduleName, errors) =>
+ instrumentationObject.array(moduleName)
+ errors.asScala.foreach(t => instrumentationObject.value(t.toString))
+ instrumentationObject.end()
+ }
+
+ instrumentationObject
+ .end() // errors
+ .end() // object
+ .done()
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-status-page/src/main/scala/kamon/status/page/StatusPage.scala b/kamon-status-page/src/main/scala/kamon/status/page/StatusPage.scala
new file mode 100644
index 00000000..3a195e28
--- /dev/null
+++ b/kamon-status-page/src/main/scala/kamon/status/page/StatusPage.scala
@@ -0,0 +1,69 @@
+package kamon
+package status
+package page
+
+import com.typesafe.config.Config
+import kamon.module.Module
+import org.slf4j.LoggerFactory
+
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Uses an embedded web server to publish a simple status page with information about Kamon's internal status.
+ */
+class StatusPage extends Module {
+ private val _logger = LoggerFactory.getLogger(classOf[StatusPage])
+ @volatile private var _statusPageServer: Option[StatusPageServer] = None
+
+ override def start(): Unit =
+ init(Kamon.config())
+
+ override def stop(): Unit =
+ stopServer()
+
+ override def reconfigure(newConfig: Config): Unit =
+ init(newConfig)
+
+
+ private def init(config: Config): Unit = synchronized {
+ val listenConfig = config.getConfig("kamon.status-page.listen")
+ val hostname = listenConfig.getString("hostname")
+ val port = listenConfig.getInt("port")
+
+ _statusPageServer.fold {
+ // Starting a new server on the configured hostname/port
+ startServer(hostname, port, Kamon.classLoader())
+
+ }(existentServer => {
+ // If the configuration has changed we will stop the previous version
+ // and start a new one with the new hostname/port.
+
+ if(existentServer.getHostname != hostname || existentServer.getListeningPort != port) {
+ stopServer()
+ startServer(hostname, port, Kamon.classLoader())
+ }
+ })
+ }
+
+ private def startServer(hostname: String, port: Int, resourceLoader: ClassLoader): Unit = {
+ Try {
+ val server = new StatusPageServer(hostname, port, resourceLoader, Kamon.status())
+ server.start()
+ server
+
+ } match {
+ case Success(server) =>
+ _logger.info(s"Status page started on http://$hostname:$port/")
+ _statusPageServer = Some(server)
+
+ case Failure(t) =>
+ _logger.error("Failed to start the status page embedded server", t)
+ }
+ }
+
+ private def stopServer(): Unit = {
+ _statusPageServer.foreach(_.stop())
+ _statusPageServer = None
+ }
+
+}
diff --git a/kamon-status-page/src/main/scala/kamon/status/page/StatusPageServer.scala b/kamon-status-page/src/main/scala/kamon/status/page/StatusPageServer.scala
new file mode 100644
index 00000000..cf6588bf
--- /dev/null
+++ b/kamon-status-page/src/main/scala/kamon/status/page/StatusPageServer.scala
@@ -0,0 +1,117 @@
+package kamon.status.page
+
+import java.io.InputStream
+import java.util.Collections
+import java.util.concurrent.{ExecutorService, Executors}
+
+import fi.iki.elonen.NanoHTTPD
+import fi.iki.elonen.NanoHTTPD.Response.{Status => StatusCode}
+import kamon.status.Status
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+/**
+ * Exposes an embedded HTTP server based on NanoHTTP.
+ */
+class StatusPageServer(hostname: String, port: Int, resourceLoader: ClassLoader, status: Status)
+ extends NanoHTTPD(hostname, port) {
+
+ private val RootResourceDirectory = "status-page"
+ private val ResourceExtensionRegex = ".*\\.([a-zA-Z0-9]*)".r
+
+ override def serve(session: NanoHTTPD.IHTTPSession): NanoHTTPD.Response = {
+ if(session.getMethod() == NanoHTTPD.Method.GET) {
+ if(session.getUri().startsWith("/status")) {
+
+ // Serve the current status data on Json.
+ session.getUri() match {
+ case "/status/settings" => json(status.settings())
+ case "/status/modules" => json(status.moduleRegistry())
+ case "/status/metrics" => json(status.metricRegistry())
+ case "/status/instrumentation" => json(status.instrumentation())
+ case _ => NotFound
+ }
+
+ } else {
+
+ // Serve resources from the status page folder.
+ val requestedResource = if (session.getUri() == "/") "/index.html" else session.getUri()
+ val resourcePath = RootResourceDirectory + requestedResource
+ val resourceStream = resourceLoader.getResourceAsStream(resourcePath)
+
+ if (resourceStream == null) NotFound else resource(requestedResource, resourceStream)
+ }
+
+ } else NotAllowed
+ }
+
+ override def start(): Unit = {
+ setAsyncRunner(new ThreadPoolRunner(Executors.newFixedThreadPool(2)))
+ start(NanoHTTPD.SOCKET_READ_TIMEOUT, false)
+ }
+
+ private def mimeType(resource: String): String = {
+ val ResourceExtensionRegex(resourceExtension) = resource
+ resourceExtension match {
+ case "css" => "text/css"
+ case "js" => "application/javascript"
+ case "ico" => "image/x-icon"
+ case "svg" => "image/svg+xml"
+ case "html" => "text/html"
+ case "woff2" => "font/woff2"
+ case _ => "text/plain"
+ }
+ }
+
+ private def json[T](instance: T)(implicit marshalling: JsonMarshalling[T]) = {
+ val builder = new java.lang.StringBuilder()
+ marshalling.toJson(instance, builder)
+
+ val response = NanoHTTPD.newFixedLengthResponse(StatusCode.OK, "application/json", builder.toString())
+ response.closeConnection(true)
+ response
+ }
+
+ private def resource(name: String, stream: InputStream) = {
+ val response = NanoHTTPD.newChunkedResponse(StatusCode.OK, mimeType(name), stream)
+ response.closeConnection(true)
+ response
+ }
+
+ private val NotAllowed = NanoHTTPD.newFixedLengthResponse(
+ StatusCode.METHOD_NOT_ALLOWED,
+ NanoHTTPD.MIME_PLAINTEXT,
+ "Only GET requests are allowed."
+ )
+
+ private val NotFound = NanoHTTPD.newFixedLengthResponse(
+ StatusCode.NOT_FOUND,
+ NanoHTTPD.MIME_PLAINTEXT,
+ "The requested resource was not found."
+ )
+
+ // Closing the connections will ensure that the thread pool will not be exhausted by keep alive
+ // connections from the browsers.
+ NotAllowed.closeConnection(true)
+ NotFound.closeConnection(true)
+
+
+ /**
+ * AsyncRunner that uses a thread pool for handling requests rather than spawning a new thread for each request (as
+ * the default runner does).
+ */
+ private class ThreadPoolRunner(executorService: ExecutorService) extends NanoHTTPD.AsyncRunner {
+ final private val _openRequests = Collections.synchronizedList(new java.util.LinkedList[NanoHTTPD#ClientHandler]())
+
+ override def closeAll(): Unit =
+ _openRequests.asScala.foreach(_.close())
+
+ override def closed(clientHandler: NanoHTTPD#ClientHandler): Unit =
+ _openRequests.remove(clientHandler)
+
+ override def exec(clientHandler: NanoHTTPD#ClientHandler): Unit = {
+ executorService.submit(clientHandler)
+ _openRequests.add(clientHandler)
+ }
+ }
+} \ No newline at end of file