diff options
Diffstat (limited to 'kamon-status-page/src/main/scala/kamon/status')
3 files changed, 321 insertions, 0 deletions
diff --git a/kamon-status-page/src/main/scala/kamon/status/page/JsonMarshalling.scala b/kamon-status-page/src/main/scala/kamon/status/page/JsonMarshalling.scala new file mode 100644 index 00000000..0a66fcac --- /dev/null +++ b/kamon-status-page/src/main/scala/kamon/status/page/JsonMarshalling.scala @@ -0,0 +1,135 @@ +package kamon.status.page + +import java.lang.{StringBuilder => JavaStringBuilder} + +import com.grack.nanojson.JsonWriter +import com.typesafe.config.ConfigRenderOptions +import kamon.module.Module +import kamon.status.Status + +import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, mapAsScalaMapConverter} + + +trait JsonMarshalling[T] { + + /** + * Implementations should append a Json object or array that describes the given instance members and any + * additional information that is expected to be shown in the status mini site. + */ + def toJson(instance: T, builder: JavaStringBuilder): Unit +} + +object JsonMarshalling { + + implicit object ModuleRegistryStatusJsonMarshalling extends JsonMarshalling[Status.ModuleRegistry] { + override def toJson(instance: Status.ModuleRegistry, builder: JavaStringBuilder): Unit = { + def moduleKindString(moduleKind: Module.Kind): String = moduleKind match { + case Module.Kind.Combined => "combined" + case Module.Kind.Metric => "metric" + case Module.Kind.Span => "span" + case Module.Kind.Plain => "plain" + } + + val array = JsonWriter.on(builder) + .`object`() + .array("modules") + + instance.modules.foreach(m => { + array.`object`() + .value("name", m.name) + .value("description", m.description) + .value("clazz", m.clazz) + .value("kind", moduleKindString(m.kind)) + .value("programmaticallyRegistered", m.programmaticallyRegistered) + .value("enabled", m.enabled) + .value("started", m.started) + .end() + }) + + array.end().end().done() + } + } + + implicit object BaseInfoJsonMarshalling extends JsonMarshalling[Status.Settings] { + override def toJson(instance: Status.Settings, builder: JavaStringBuilder): Unit = { + val baseConfigJson = JsonWriter.on(builder) + .`object`() + .value("version", instance.version) + .value("config", instance.config.root().render(ConfigRenderOptions.concise())) + + baseConfigJson.`object`("environment") + .value("service", instance.environment.service) + .value("host", instance.environment.host) + .value("instance", instance.environment.instance) + .`object`("tags") + + instance.environment.tags.foreach { + case (key, value) => baseConfigJson.value(key, value) + } + + baseConfigJson + .end() // ends tags + .end() // ends environment + .end() // ends base config + .done() + } + } + + implicit object MetricRegistryStatusJsonMarshalling extends JsonMarshalling[Status.MetricRegistry] { + override def toJson(instance: Status.MetricRegistry, builder: JavaStringBuilder): Unit = { + val metricsObject = JsonWriter.on(builder) + .`object` + .array("metrics") + + instance.metrics.foreach(metric => { + metricsObject + .`object`() + .value("name", metric.name) + .value("type", metric.instrumentType.name) + .value("unitDimension", metric.unit.dimension.name) + .value("unitMagnitude", metric.unit.magnitude.name) + .`object`("tags") + + metric.tags.foreach { case (tag, value) => metricsObject.value(tag, value) } + + metricsObject + .end() // tags + .end() // metric info + }) + + metricsObject + .end() // metrics array + .end() // object + .done() + } + } + + implicit object InstrumentationStatusJsonMarshalling extends JsonMarshalling[Status.Instrumentation] { + override def toJson(instance: Status.Instrumentation, builder: JavaStringBuilder): Unit = { + val instrumentationObject = JsonWriter.on(builder) + .`object`() + .value("active", instance.active) + .`object`("modules") + + instance.modules.asScala.foreach { + case (moduleName, moduleDescription) => instrumentationObject.value(moduleName, moduleDescription) + } + + instrumentationObject + .end() // end modules + .`object`("errors") + + instance.errors.asScala.foreach { + case (moduleName, errors) => + instrumentationObject.array(moduleName) + errors.asScala.foreach(t => instrumentationObject.value(t.toString)) + instrumentationObject.end() + } + + instrumentationObject + .end() // errors + .end() // object + .done() + } + } +}
\ No newline at end of file diff --git a/kamon-status-page/src/main/scala/kamon/status/page/StatusPage.scala b/kamon-status-page/src/main/scala/kamon/status/page/StatusPage.scala new file mode 100644 index 00000000..3a195e28 --- /dev/null +++ b/kamon-status-page/src/main/scala/kamon/status/page/StatusPage.scala @@ -0,0 +1,69 @@ +package kamon +package status +package page + +import com.typesafe.config.Config +import kamon.module.Module +import org.slf4j.LoggerFactory + +import scala.util.{Failure, Success, Try} + +/** + * Uses an embedded web server to publish a simple status page with information about Kamon's internal status. + */ +class StatusPage extends Module { + private val _logger = LoggerFactory.getLogger(classOf[StatusPage]) + @volatile private var _statusPageServer: Option[StatusPageServer] = None + + override def start(): Unit = + init(Kamon.config()) + + override def stop(): Unit = + stopServer() + + override def reconfigure(newConfig: Config): Unit = + init(newConfig) + + + private def init(config: Config): Unit = synchronized { + val listenConfig = config.getConfig("kamon.status-page.listen") + val hostname = listenConfig.getString("hostname") + val port = listenConfig.getInt("port") + + _statusPageServer.fold { + // Starting a new server on the configured hostname/port + startServer(hostname, port, Kamon.classLoader()) + + }(existentServer => { + // If the configuration has changed we will stop the previous version + // and start a new one with the new hostname/port. + + if(existentServer.getHostname != hostname || existentServer.getListeningPort != port) { + stopServer() + startServer(hostname, port, Kamon.classLoader()) + } + }) + } + + private def startServer(hostname: String, port: Int, resourceLoader: ClassLoader): Unit = { + Try { + val server = new StatusPageServer(hostname, port, resourceLoader, Kamon.status()) + server.start() + server + + } match { + case Success(server) => + _logger.info(s"Status page started on http://$hostname:$port/") + _statusPageServer = Some(server) + + case Failure(t) => + _logger.error("Failed to start the status page embedded server", t) + } + } + + private def stopServer(): Unit = { + _statusPageServer.foreach(_.stop()) + _statusPageServer = None + } + +} diff --git a/kamon-status-page/src/main/scala/kamon/status/page/StatusPageServer.scala b/kamon-status-page/src/main/scala/kamon/status/page/StatusPageServer.scala new file mode 100644 index 00000000..cf6588bf --- /dev/null +++ b/kamon-status-page/src/main/scala/kamon/status/page/StatusPageServer.scala @@ -0,0 +1,117 @@ +package kamon.status.page + +import java.io.InputStream +import java.util.Collections +import java.util.concurrent.{ExecutorService, Executors} + +import fi.iki.elonen.NanoHTTPD +import fi.iki.elonen.NanoHTTPD.Response.{Status => StatusCode} +import kamon.status.Status + +import scala.collection.JavaConverters.asScalaBufferConverter + +/** + * Exposes an embedded HTTP server based on NanoHTTP. + */ +class StatusPageServer(hostname: String, port: Int, resourceLoader: ClassLoader, status: Status) + extends NanoHTTPD(hostname, port) { + + private val RootResourceDirectory = "status-page" + private val ResourceExtensionRegex = ".*\\.([a-zA-Z0-9]*)".r + + override def serve(session: NanoHTTPD.IHTTPSession): NanoHTTPD.Response = { + if(session.getMethod() == NanoHTTPD.Method.GET) { + if(session.getUri().startsWith("/status")) { + + // Serve the current status data on Json. + session.getUri() match { + case "/status/settings" => json(status.settings()) + case "/status/modules" => json(status.moduleRegistry()) + case "/status/metrics" => json(status.metricRegistry()) + case "/status/instrumentation" => json(status.instrumentation()) + case _ => NotFound + } + + } else { + + // Serve resources from the status page folder. + val requestedResource = if (session.getUri() == "/") "/index.html" else session.getUri() + val resourcePath = RootResourceDirectory + requestedResource + val resourceStream = resourceLoader.getResourceAsStream(resourcePath) + + if (resourceStream == null) NotFound else resource(requestedResource, resourceStream) + } + + } else NotAllowed + } + + override def start(): Unit = { + setAsyncRunner(new ThreadPoolRunner(Executors.newFixedThreadPool(2))) + start(NanoHTTPD.SOCKET_READ_TIMEOUT, false) + } + + private def mimeType(resource: String): String = { + val ResourceExtensionRegex(resourceExtension) = resource + resourceExtension match { + case "css" => "text/css" + case "js" => "application/javascript" + case "ico" => "image/x-icon" + case "svg" => "image/svg+xml" + case "html" => "text/html" + case "woff2" => "font/woff2" + case _ => "text/plain" + } + } + + private def json[T](instance: T)(implicit marshalling: JsonMarshalling[T]) = { + val builder = new java.lang.StringBuilder() + marshalling.toJson(instance, builder) + + val response = NanoHTTPD.newFixedLengthResponse(StatusCode.OK, "application/json", builder.toString()) + response.closeConnection(true) + response + } + + private def resource(name: String, stream: InputStream) = { + val response = NanoHTTPD.newChunkedResponse(StatusCode.OK, mimeType(name), stream) + response.closeConnection(true) + response + } + + private val NotAllowed = NanoHTTPD.newFixedLengthResponse( + StatusCode.METHOD_NOT_ALLOWED, + NanoHTTPD.MIME_PLAINTEXT, + "Only GET requests are allowed." + ) + + private val NotFound = NanoHTTPD.newFixedLengthResponse( + StatusCode.NOT_FOUND, + NanoHTTPD.MIME_PLAINTEXT, + "The requested resource was not found." + ) + + // Closing the connections will ensure that the thread pool will not be exhausted by keep alive + // connections from the browsers. + NotAllowed.closeConnection(true) + NotFound.closeConnection(true) + + + /** + * AsyncRunner that uses a thread pool for handling requests rather than spawning a new thread for each request (as + * the default runner does). + */ + private class ThreadPoolRunner(executorService: ExecutorService) extends NanoHTTPD.AsyncRunner { + final private val _openRequests = Collections.synchronizedList(new java.util.LinkedList[NanoHTTPD#ClientHandler]()) + + override def closeAll(): Unit = + _openRequests.asScala.foreach(_.close()) + + override def closed(clientHandler: NanoHTTPD#ClientHandler): Unit = + _openRequests.remove(clientHandler) + + override def exec(clientHandler: NanoHTTPD#ClientHandler): Unit = { + executorService.submit(clientHandler) + _openRequests.add(clientHandler) + } + } +}
\ No newline at end of file |