aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-07-23 15:08:59 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-07-23 15:08:59 -0300
commit63f99d1f5cc7850fc7c559385b85fa1828d7cd79 (patch)
treeec38ed5146eedddd455d8525e7f9fa10679b8190
parent78badb3090a07fe6447255b6d3d690f0f8569feb (diff)
downloadKamon-63f99d1f5cc7850fc7c559385b85fa1828d7cd79.tar.gz
Kamon-63f99d1f5cc7850fc7c559385b85fa1828d7cd79.tar.bz2
Kamon-63f99d1f5cc7850fc7c559385b85fa1828d7cd79.zip
+ core,spray: create and implement http server metrics for spray, related to #56
-rw-r--r--kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala92
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala2
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/Spray.scala4
-rw-r--r--kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala19
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala91
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala (renamed from kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala)33
6 files changed, 203 insertions, 38 deletions
diff --git a/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala
new file mode 100644
index 00000000..3773e7d8
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/http/HttpServerMetrics.scala
@@ -0,0 +1,92 @@
+package kamon.http
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+import kamon.metric.instrument.Counter
+import kamon.metric._
+
+import scala.collection.concurrent.TrieMap
+
+object HttpServerMetrics extends MetricGroupIdentity {
+ val name: String = "http-server-metrics-recorder"
+ val category = new MetricGroupCategory {
+ val name: String = "http-server"
+ }
+
+ type TraceName = String
+ type StatusCode = String
+
+ case class CountPerStatusCode(statusCode: String) extends MetricIdentity {
+ def name: String = statusCode
+ }
+
+ case class TraceCountPerStatus(traceName: TraceName, statusCode: StatusCode) extends MetricIdentity {
+ def name: String = traceName + "_" + statusCode
+ }
+
+ class HttpServerMetricsRecorder extends MetricGroupRecorder {
+
+ private val counters = TrieMap[StatusCode, Counter]()
+ private val countersPerTrace = TrieMap[TraceName, TrieMap[StatusCode, Counter]]()
+
+ def recordResponse(statusCode: StatusCode): Unit = recordResponse(statusCode, 1L)
+
+ def recordResponse(statusCode: StatusCode, count: Long): Unit =
+ counters.getOrElseUpdate(statusCode, Counter()).increment(count)
+
+ def recordResponse(traceName: TraceName, statusCode: StatusCode): Unit = recordResponse(traceName, statusCode, 1L)
+
+ def recordResponse(traceName: TraceName, statusCode: StatusCode, count: Long): Unit = {
+ recordResponse(statusCode, count)
+ countersPerTrace.getOrElseUpdate(traceName, TrieMap()).getOrElseUpdate(statusCode, Counter()).increment(count)
+ }
+
+ def collect(context: CollectionContext): HttpServerMetricsSnapshot = {
+ val countsPerStatusCode = counters.map {
+ case (statusCode, counter) ⇒ (statusCode, counter.collect(context))
+ }.toMap
+
+ val countsPerTraceAndStatus = countersPerTrace.map {
+ case (traceName, countsPerStatus) ⇒
+ (traceName, countsPerStatus.map { case (statusCode, counter) ⇒ (statusCode, counter.collect(context)) }.toMap)
+ }.toMap
+
+ HttpServerMetricsSnapshot(countsPerStatusCode, countsPerTraceAndStatus)
+ }
+
+ def cleanup: Unit = {}
+ }
+
+ case class HttpServerMetricsSnapshot(countsPerStatusCode: Map[StatusCode, Counter.Snapshot],
+ countsPerTraceAndStatusCode: Map[TraceName, Map[StatusCode, Counter.Snapshot]]) extends MetricGroupSnapshot {
+
+ type GroupSnapshotType = HttpServerMetricsSnapshot
+
+ def merge(that: HttpServerMetricsSnapshot, context: CollectionContext): HttpServerMetricsSnapshot = {
+ val combinedCountsPerStatus = combineMaps(countsPerStatusCode, that.countsPerStatusCode)((l, r) ⇒ l.merge(r, context))
+ val combinedCountsPerTraceAndStatus = combineMaps(countsPerTraceAndStatusCode, that.countsPerTraceAndStatusCode) {
+ (leftCounts, rightCounts) ⇒ combineMaps(leftCounts, rightCounts)((l, r) ⇒ l.merge(r, context))
+ }
+ HttpServerMetricsSnapshot(combinedCountsPerStatus, combinedCountsPerTraceAndStatus)
+ }
+
+ def metrics: Map[MetricIdentity, MetricSnapshot] = {
+ countsPerStatusCode.map {
+ case (statusCode, count) ⇒ (CountPerStatusCode(statusCode), count)
+ } ++ {
+ for (
+ (traceName, countsPerStatus) ← countsPerTraceAndStatusCode;
+ (statusCode, count) ← countsPerStatus
+ ) yield (TraceCountPerStatus(traceName, statusCode), count)
+ }
+ }
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = HttpServerMetricsRecorder
+
+ def create(config: Config, system: ActorSystem): HttpServerMetricsRecorder =
+ new HttpServerMetricsRecorder()
+ }
+
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
index 1ee1eab4..187eb07d 100644
--- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
@@ -54,7 +54,7 @@ object TraceMetrics extends MetricGroupCategory {
type GroupSnapshotType = TraceMetricsSnapshot
def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot =
- TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty)
+ TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty) // TODO: Merge the segments metrics correctly and test it!
def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime)
}
diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala
index 9de1882a..76adb214 100644
--- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/Spray.scala
@@ -19,6 +19,8 @@ package kamon.spray
import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
import akka.actor
import kamon.Kamon
+import kamon.http.HttpServerMetrics
+import kamon.metric.Metrics
import spray.http.HttpRequest
object Spray extends ExtensionId[SprayExtension] with ExtensionIdProvider {
@@ -38,6 +40,8 @@ class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Exte
val includeTraceToken: Boolean = config.getBoolean("automatic-trace-token-propagation")
val traceTokenHeaderName: String = config.getString("trace-token-header-name")
+ val httpServerMetrics = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get
+ // It's safe to assume that HttpServerMetrics will always exist because there is no particular filter for it.
val clientSegmentCollectionStrategy: ClientSegmentCollectionStrategy.Strategy =
config.getString("client.segment-collection-strategy") match {
diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
index 85782e22..69b0160e 100644
--- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
@@ -22,7 +22,7 @@ import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest }
import akka.event.Logging.Warning
import scala.Some
import kamon.Kamon
-import kamon.spray.Spray
+import kamon.spray.{ SprayExtension, Spray }
import org.aspectj.lang.ProceedingJoinPoint
import spray.http.HttpHeaders.RawHeader
@@ -68,20 +68,21 @@ class ServerRequestInstrumentation {
val storedContext = openRequest.traceContext
verifyTraceContextConsistency(incomingContext, storedContext)
- val proceedResult = incomingContext match {
+ incomingContext match {
case None ⇒ pjp.proceed()
case Some(traceContext) ⇒
val sprayExtension = Kamon(Spray)(traceContext.system)
- if (sprayExtension.includeTraceToken) {
+ val proceedResult = if (sprayExtension.includeTraceToken) {
val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, traceContext.token)
pjp.proceed(Array(openRequest, responseWithHeader))
} else pjp.proceed
- }
- TraceRecorder.finish()
- proceedResult
+ TraceRecorder.finish()
+ recordHttpServerMetrics(response, traceContext.name, sprayExtension)
+ proceedResult
+ }
}
def verifyTraceContextConsistency(incomingTraceContext: Option[TraceContext], storedTraceContext: Option[TraceContext]): Unit = {
@@ -102,6 +103,12 @@ class ServerRequestInstrumentation {
}
+ def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit =
+ response match {
+ case httpResponse: HttpResponse ⇒ sprayExtension.httpServerMetrics.recordResponse(traceName, httpResponse.status.intValue.toString)
+ case other ⇒ // Nothing to do then.
+ }
+
def includeTraceTokenIfPossible(response: HttpMessagePartWrapper, traceTokenHeaderName: String, token: String): HttpMessagePartWrapper =
response match {
case response: HttpResponse ⇒ response.withHeaders(response.headers ::: RawHeader(traceTokenHeaderName, token) :: Nil)
diff --git a/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala b/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala
new file mode 100644
index 00000000..c4b370d7
--- /dev/null
+++ b/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala
@@ -0,0 +1,91 @@
+package kamon.spray
+
+import akka.actor.ActorSystem
+import akka.testkit.{ TestProbe, TestKitBase }
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.http.HttpServerMetrics
+import kamon.metric._
+import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures }
+import org.scalatest.{ Matchers, WordSpecLike }
+import spray.http.{ StatusCodes, HttpResponse, HttpRequest }
+import spray.httpx.RequestBuilding
+
+class SprayServerMetricsSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding
+ with ScalaFutures with PatienceConfiguration with TestServer {
+
+ val collectionContext = CollectionContext(100)
+
+ implicit lazy val system: ActorSystem = ActorSystem("spray-server-metrics-spec", ConfigFactory.parseString(
+ """
+ |akka {
+ | loglevel = ERROR
+ |}
+ |
+ |kamon {
+ | metrics {
+ | tick-interval = 1 hour
+ |
+ | filters = [
+ | {
+ | trace {
+ | includes = [ "*" ]
+ | excludes = []
+ | }
+ | }
+ | ]
+ | }
+ |}
+ """.stripMargin))
+
+ "the Spray Server metrics instrumentation" should {
+ "record trace metrics for requests received" in {
+ Kamon(Metrics)(system).register(TraceMetrics("GET: /record-trace-metrics"), TraceMetrics.Factory).get.collect(collectionContext)
+ val (connection, server) = buildClientConnectionAndServer
+ val client = TestProbe()
+
+ for (repetition ← 1 to 10) {
+ client.send(connection, Get("/record-trace-metrics"))
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+ }
+
+ for (repetition ← 1 to 5) {
+ client.send(connection, Get("/record-trace-metrics"))
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "bad-request", status = StatusCodes.BadRequest))
+ client.expectMsgType[HttpResponse]
+ }
+
+ val snapshot = Kamon(Metrics)(system).register(TraceMetrics("GET: /record-trace-metrics"), TraceMetrics.Factory).get.collect(collectionContext)
+ snapshot.elapsedTime.numberOfMeasurements should be(15)
+ }
+
+ "record http serve metrics for all the requests" in {
+ Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get.collect(collectionContext)
+ val (connection, server) = buildClientConnectionAndServer
+ val client = TestProbe()
+
+ for (repetition ← 1 to 10) {
+ client.send(connection, Get("/record-http-metrics"))
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+ }
+
+ for (repetition ← 1 to 5) {
+ client.send(connection, Get("/record-http-metrics"))
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "bad-request", status = StatusCodes.BadRequest))
+ client.expectMsgType[HttpResponse]
+ }
+
+ val snapshot = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get.collect(collectionContext)
+ snapshot.countsPerTraceAndStatusCode("GET: /record-http-metrics")("200").count should be(10)
+ snapshot.countsPerTraceAndStatusCode("GET: /record-http-metrics")("400").count should be(5)
+ snapshot.countsPerStatusCode("200").count should be(10)
+ snapshot.countsPerStatusCode("400").count should be(5)
+ }
+ }
+}
diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala
index ab9116fd..48253b1d 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala
@@ -30,10 +30,10 @@ import com.typesafe.config.ConfigFactory
import kamon.metric.TraceMetrics.ElapsedTime
import kamon.metric.instrument.Histogram
-class ServerRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding
+class SprayServerTracingSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding
with ScalaFutures with PatienceConfiguration with TestServer {
- implicit lazy val system: ActorSystem = ActorSystem("client-pipelining-segment-strategy-instrumentation-spec", ConfigFactory.parseString(
+ implicit lazy val system: ActorSystem = ActorSystem("spray-server-tracing-spec", ConfigFactory.parseString(
"""
|akka {
| loglevel = ERROR
@@ -52,12 +52,6 @@ class ServerRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
| }
| ]
| }
- |
- | spray {
- | client {
- | segment-collection-strategy = internal
- | }
- | }
|}
""".stripMargin))
@@ -104,29 +98,6 @@ class ServerRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
response.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled"))
}
-
- "open and finish a trace during the lifetime of a request" in {
- val (connection, server) = buildClientConnectionAndServer
- val client = TestProbe()
-
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
- metricListener.expectMsgType[TickMetricSnapshot]
-
- client.send(connection, Get("/open-and-finish"))
- server.expectMsgType[HttpRequest]
- server.reply(HttpResponse(entity = "ok"))
- client.expectMsgType[HttpResponse]
-
- val tickSnapshot = metricListener.expectMsgType[TickMetricSnapshot]
- val traceMetrics = tickSnapshot.metrics.find { case (k, v) ⇒ k.name.contains("open-and-finish") } map (_._2.metrics)
- traceMetrics should not be empty
-
- traceMetrics map { metrics ⇒
- metrics(ElapsedTime).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L)
- }
- }
-
}
def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true)