From 63f99d1f5cc7850fc7c559385b85fa1828d7cd79 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 23 Jul 2014 15:08:59 -0300 Subject: + core,spray: create and implement http server metrics for spray, related to #56 --- .../main/scala/kamon/http/HttpServerMetrics.scala | 92 +++++++++++++ .../src/main/scala/kamon/metric/TraceMetrics.scala | 2 +- kamon-spray/src/main/scala/kamon/spray/Spray.scala | 4 + .../can/server/ServerRequestInstrumentation.scala | 19 ++- .../spray/ServerRequestInstrumentationSpec.scala | 142 --------------------- .../scala/kamon/spray/SprayServerMetricsSpec.scala | 91 +++++++++++++ .../scala/kamon/spray/SprayServerTracingSpec.scala | 113 ++++++++++++++++ 7 files changed, 314 insertions(+), 149 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala delete mode 100644 kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala create mode 100644 kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala create mode 100644 kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala diff --git a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala new file mode 100644 index 00000000..3773e7d8 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala @@ -0,0 +1,92 @@ +package kamon.http + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric.instrument.Counter +import kamon.metric._ + +import scala.collection.concurrent.TrieMap + +object HttpServerMetrics extends MetricGroupIdentity { + val name: String = "http-server-metrics-recorder" + val category = new MetricGroupCategory { + val name: String = "http-server" + } + + type TraceName = String + type StatusCode = String + + case class CountPerStatusCode(statusCode: String) extends MetricIdentity { + def name: String = statusCode + } + + case class TraceCountPerStatus(traceName: TraceName, statusCode: StatusCode) extends MetricIdentity { + def name: String = traceName + "_" + statusCode + } + + class HttpServerMetricsRecorder extends MetricGroupRecorder { + + private val counters = TrieMap[StatusCode, Counter]() + private val countersPerTrace = TrieMap[TraceName, TrieMap[StatusCode, Counter]]() + + def recordResponse(statusCode: StatusCode): Unit = recordResponse(statusCode, 1L) + + def recordResponse(statusCode: StatusCode, count: Long): Unit = + counters.getOrElseUpdate(statusCode, Counter()).increment(count) + + def recordResponse(traceName: TraceName, statusCode: StatusCode): Unit = recordResponse(traceName, statusCode, 1L) + + def recordResponse(traceName: TraceName, statusCode: StatusCode, count: Long): Unit = { + recordResponse(statusCode, count) + countersPerTrace.getOrElseUpdate(traceName, TrieMap()).getOrElseUpdate(statusCode, Counter()).increment(count) + } + + def collect(context: CollectionContext): HttpServerMetricsSnapshot = { + val countsPerStatusCode = counters.map { + case (statusCode, counter) ⇒ (statusCode, counter.collect(context)) + }.toMap + + val countsPerTraceAndStatus = countersPerTrace.map { + case (traceName, countsPerStatus) ⇒ + (traceName, countsPerStatus.map { case (statusCode, counter) ⇒ (statusCode, counter.collect(context)) }.toMap) + }.toMap + + HttpServerMetricsSnapshot(countsPerStatusCode, countsPerTraceAndStatus) + } + + def cleanup: Unit = {} + } + + case class HttpServerMetricsSnapshot(countsPerStatusCode: Map[StatusCode, Counter.Snapshot], + countsPerTraceAndStatusCode: Map[TraceName, Map[StatusCode, Counter.Snapshot]]) extends MetricGroupSnapshot { + + type GroupSnapshotType = HttpServerMetricsSnapshot + + def merge(that: HttpServerMetricsSnapshot, context: CollectionContext): HttpServerMetricsSnapshot = { + val combinedCountsPerStatus = combineMaps(countsPerStatusCode, that.countsPerStatusCode)((l, r) ⇒ l.merge(r, context)) + val combinedCountsPerTraceAndStatus = combineMaps(countsPerTraceAndStatusCode, that.countsPerTraceAndStatusCode) { + (leftCounts, rightCounts) ⇒ combineMaps(leftCounts, rightCounts)((l, r) ⇒ l.merge(r, context)) + } + HttpServerMetricsSnapshot(combinedCountsPerStatus, combinedCountsPerTraceAndStatus) + } + + def metrics: Map[MetricIdentity, MetricSnapshot] = { + countsPerStatusCode.map { + case (statusCode, count) ⇒ (CountPerStatusCode(statusCode), count) + } ++ { + for ( + (traceName, countsPerStatus) ← countsPerTraceAndStatusCode; + (statusCode, count) ← countsPerStatus + ) yield (TraceCountPerStatus(traceName, statusCode), count) + } + } + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = HttpServerMetricsRecorder + + def create(config: Config, system: ActorSystem): HttpServerMetricsRecorder = + new HttpServerMetricsRecorder() + } + +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala index 1ee1eab4..187eb07d 100644 --- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -54,7 +54,7 @@ object TraceMetrics extends MetricGroupCategory { type GroupSnapshotType = TraceMetricsSnapshot def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot = - TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty) + TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty) // TODO: Merge the segments metrics correctly and test it! def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) } diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala index 9de1882a..76adb214 100644 --- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala +++ b/kamon-spray/src/main/scala/kamon/spray/Spray.scala @@ -19,6 +19,8 @@ package kamon.spray import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } import akka.actor import kamon.Kamon +import kamon.http.HttpServerMetrics +import kamon.metric.Metrics import spray.http.HttpRequest object Spray extends ExtensionId[SprayExtension] with ExtensionIdProvider { @@ -38,6 +40,8 @@ class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Exte val includeTraceToken: Boolean = config.getBoolean("automatic-trace-token-propagation") val traceTokenHeaderName: String = config.getString("trace-token-header-name") + val httpServerMetrics = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get + // It's safe to assume that HttpServerMetrics will always exist because there is no particular filter for it. val clientSegmentCollectionStrategy: ClientSegmentCollectionStrategy.Strategy = config.getString("client.segment-collection-strategy") match { diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala index 85782e22..69b0160e 100644 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala @@ -22,7 +22,7 @@ import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest } import akka.event.Logging.Warning import scala.Some import kamon.Kamon -import kamon.spray.Spray +import kamon.spray.{ SprayExtension, Spray } import org.aspectj.lang.ProceedingJoinPoint import spray.http.HttpHeaders.RawHeader @@ -68,20 +68,21 @@ class ServerRequestInstrumentation { val storedContext = openRequest.traceContext verifyTraceContextConsistency(incomingContext, storedContext) - val proceedResult = incomingContext match { + incomingContext match { case None ⇒ pjp.proceed() case Some(traceContext) ⇒ val sprayExtension = Kamon(Spray)(traceContext.system) - if (sprayExtension.includeTraceToken) { + val proceedResult = if (sprayExtension.includeTraceToken) { val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, traceContext.token) pjp.proceed(Array(openRequest, responseWithHeader)) } else pjp.proceed - } - TraceRecorder.finish() - proceedResult + TraceRecorder.finish() + recordHttpServerMetrics(response, traceContext.name, sprayExtension) + proceedResult + } } def verifyTraceContextConsistency(incomingTraceContext: Option[TraceContext], storedTraceContext: Option[TraceContext]): Unit = { @@ -102,6 +103,12 @@ class ServerRequestInstrumentation { } + def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit = + response match { + case httpResponse: HttpResponse ⇒ sprayExtension.httpServerMetrics.recordResponse(traceName, httpResponse.status.intValue.toString) + case other ⇒ // Nothing to do then. + } + def includeTraceTokenIfPossible(response: HttpMessagePartWrapper, traceTokenHeaderName: String, token: String): HttpMessagePartWrapper = response match { case response: HttpResponse ⇒ response.withHeaders(response.headers ::: RawHeader(traceTokenHeaderName, token) :: Nil) diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala deleted file mode 100644 index ab9116fd..00000000 --- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala +++ /dev/null @@ -1,142 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.spray - -import _root_.spray.httpx.RequestBuilding -import akka.testkit.{ TestKitBase, TestProbe } -import akka.actor.ActorSystem -import org.scalatest.{ Matchers, WordSpecLike } -import kamon.Kamon -import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } -import spray.http.HttpHeaders.RawHeader -import spray.http.{ HttpResponse, HttpRequest } -import kamon.metric.{ TraceMetrics, Metrics } -import kamon.metric.Subscriptions.TickMetricSnapshot -import com.typesafe.config.ConfigFactory -import kamon.metric.TraceMetrics.ElapsedTime -import kamon.metric.instrument.Histogram - -class ServerRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding - with ScalaFutures with PatienceConfiguration with TestServer { - - implicit lazy val system: ActorSystem = ActorSystem("client-pipelining-segment-strategy-instrumentation-spec", ConfigFactory.parseString( - """ - |akka { - | loglevel = ERROR - |} - | - |kamon { - | metrics { - | tick-interval = 2 seconds - | - | filters = [ - | { - | trace { - | includes = [ "*" ] - | excludes = [] - | } - | } - | ] - | } - | - | spray { - | client { - | segment-collection-strategy = internal - | } - | } - |} - """.stripMargin)) - - "the spray server request tracing instrumentation" should { - "include the trace-token header in responses when the automatic-trace-token-propagation is enabled" in { - enableAutomaticTraceTokenPropagation() - - val (connection, server) = buildClientConnectionAndServer - val client = TestProbe() - - client.send(connection, Get("/").withHeaders(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-enabled"))) - server.expectMsgType[HttpRequest] - server.reply(HttpResponse(entity = "ok")) - val response = client.expectMsgType[HttpResponse] - - response.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-enabled")) - } - - "reply back with an automatically assigned trace token if none was provided with the request and automatic-trace-token-propagation is enabled" in { - enableAutomaticTraceTokenPropagation() - - val (connection, server) = buildClientConnectionAndServer - val client = TestProbe() - - client.send(connection, Get("/")) - server.expectMsgType[HttpRequest] - server.reply(HttpResponse(entity = "ok")) - val response = client.expectMsgType[HttpResponse] - - response.headers.filter(_.name == Kamon(Spray).traceTokenHeaderName).size should be(1) - - } - - "not include the trace-token header in responses when the automatic-trace-token-propagation is disabled" in { - disableAutomaticTraceTokenPropagation() - - val (connection, server) = buildClientConnectionAndServer - val client = TestProbe() - - client.send(connection, Get("/").withHeaders(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled"))) - server.expectMsgType[HttpRequest] - server.reply(HttpResponse(entity = "ok")) - val response = client.expectMsgType[HttpResponse] - - response.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled")) - } - - "open and finish a trace during the lifetime of a request" in { - val (connection, server) = buildClientConnectionAndServer - val client = TestProbe() - - val metricListener = TestProbe() - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) - metricListener.expectMsgType[TickMetricSnapshot] - - client.send(connection, Get("/open-and-finish")) - server.expectMsgType[HttpRequest] - server.reply(HttpResponse(entity = "ok")) - client.expectMsgType[HttpResponse] - - val tickSnapshot = metricListener.expectMsgType[TickMetricSnapshot] - val traceMetrics = tickSnapshot.metrics.find { case (k, v) ⇒ k.name.contains("open-and-finish") } map (_._2.metrics) - traceMetrics should not be empty - - traceMetrics map { metrics ⇒ - metrics(ElapsedTime).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) - } - } - - } - - def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true) - def disableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(false) - - def setIncludeTraceToken(include: Boolean): Unit = { - val target = Kamon(Spray)(system) - val field = target.getClass.getDeclaredField("includeTraceToken") - field.setAccessible(true) - field.set(target, include) - } - -} diff --git a/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala b/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala new file mode 100644 index 00000000..c4b370d7 --- /dev/null +++ b/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala @@ -0,0 +1,91 @@ +package kamon.spray + +import akka.actor.ActorSystem +import akka.testkit.{ TestProbe, TestKitBase } +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.http.HttpServerMetrics +import kamon.metric._ +import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } +import org.scalatest.{ Matchers, WordSpecLike } +import spray.http.{ StatusCodes, HttpResponse, HttpRequest } +import spray.httpx.RequestBuilding + +class SprayServerMetricsSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding + with ScalaFutures with PatienceConfiguration with TestServer { + + val collectionContext = CollectionContext(100) + + implicit lazy val system: ActorSystem = ActorSystem("spray-server-metrics-spec", ConfigFactory.parseString( + """ + |akka { + | loglevel = ERROR + |} + | + |kamon { + | metrics { + | tick-interval = 1 hour + | + | filters = [ + | { + | trace { + | includes = [ "*" ] + | excludes = [] + | } + | } + | ] + | } + |} + """.stripMargin)) + + "the Spray Server metrics instrumentation" should { + "record trace metrics for requests received" in { + Kamon(Metrics)(system).register(TraceMetrics("GET: /record-trace-metrics"), TraceMetrics.Factory).get.collect(collectionContext) + val (connection, server) = buildClientConnectionAndServer + val client = TestProbe() + + for (repetition ← 1 to 10) { + client.send(connection, Get("/record-trace-metrics")) + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "ok")) + client.expectMsgType[HttpResponse] + } + + for (repetition ← 1 to 5) { + client.send(connection, Get("/record-trace-metrics")) + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "bad-request", status = StatusCodes.BadRequest)) + client.expectMsgType[HttpResponse] + } + + val snapshot = Kamon(Metrics)(system).register(TraceMetrics("GET: /record-trace-metrics"), TraceMetrics.Factory).get.collect(collectionContext) + snapshot.elapsedTime.numberOfMeasurements should be(15) + } + + "record http serve metrics for all the requests" in { + Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get.collect(collectionContext) + val (connection, server) = buildClientConnectionAndServer + val client = TestProbe() + + for (repetition ← 1 to 10) { + client.send(connection, Get("/record-http-metrics")) + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "ok")) + client.expectMsgType[HttpResponse] + } + + for (repetition ← 1 to 5) { + client.send(connection, Get("/record-http-metrics")) + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "bad-request", status = StatusCodes.BadRequest)) + client.expectMsgType[HttpResponse] + } + + val snapshot = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get.collect(collectionContext) + snapshot.countsPerTraceAndStatusCode("GET: /record-http-metrics")("200").count should be(10) + snapshot.countsPerTraceAndStatusCode("GET: /record-http-metrics")("400").count should be(5) + snapshot.countsPerStatusCode("200").count should be(10) + snapshot.countsPerStatusCode("400").count should be(5) + } + } +} diff --git a/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala b/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala new file mode 100644 index 00000000..48253b1d --- /dev/null +++ b/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala @@ -0,0 +1,113 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.spray + +import _root_.spray.httpx.RequestBuilding +import akka.testkit.{ TestKitBase, TestProbe } +import akka.actor.ActorSystem +import org.scalatest.{ Matchers, WordSpecLike } +import kamon.Kamon +import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } +import spray.http.HttpHeaders.RawHeader +import spray.http.{ HttpResponse, HttpRequest } +import kamon.metric.{ TraceMetrics, Metrics } +import kamon.metric.Subscriptions.TickMetricSnapshot +import com.typesafe.config.ConfigFactory +import kamon.metric.TraceMetrics.ElapsedTime +import kamon.metric.instrument.Histogram + +class SprayServerTracingSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding + with ScalaFutures with PatienceConfiguration with TestServer { + + implicit lazy val system: ActorSystem = ActorSystem("spray-server-tracing-spec", ConfigFactory.parseString( + """ + |akka { + | loglevel = ERROR + |} + | + |kamon { + | metrics { + | tick-interval = 2 seconds + | + | filters = [ + | { + | trace { + | includes = [ "*" ] + | excludes = [] + | } + | } + | ] + | } + |} + """.stripMargin)) + + "the spray server request tracing instrumentation" should { + "include the trace-token header in responses when the automatic-trace-token-propagation is enabled" in { + enableAutomaticTraceTokenPropagation() + + val (connection, server) = buildClientConnectionAndServer + val client = TestProbe() + + client.send(connection, Get("/").withHeaders(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-enabled"))) + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "ok")) + val response = client.expectMsgType[HttpResponse] + + response.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-enabled")) + } + + "reply back with an automatically assigned trace token if none was provided with the request and automatic-trace-token-propagation is enabled" in { + enableAutomaticTraceTokenPropagation() + + val (connection, server) = buildClientConnectionAndServer + val client = TestProbe() + + client.send(connection, Get("/")) + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "ok")) + val response = client.expectMsgType[HttpResponse] + + response.headers.filter(_.name == Kamon(Spray).traceTokenHeaderName).size should be(1) + + } + + "not include the trace-token header in responses when the automatic-trace-token-propagation is disabled" in { + disableAutomaticTraceTokenPropagation() + + val (connection, server) = buildClientConnectionAndServer + val client = TestProbe() + + client.send(connection, Get("/").withHeaders(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled"))) + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "ok")) + val response = client.expectMsgType[HttpResponse] + + response.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled")) + } + } + + def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true) + def disableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(false) + + def setIncludeTraceToken(include: Boolean): Unit = { + val target = Kamon(Spray)(system) + val field = target.getClass.getDeclaredField("includeTraceToken") + field.setAccessible(true) + field.set(target, include) + } + +} -- cgit v1.2.3