From 89d3057f8025add4b94b32c142e220ffb79f6c33 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 31 Oct 2014 02:45:41 +0100 Subject: + spray: external naming for traces and segments, related to #65 --- .../spray/ClientRequestInstrumentationSpec.scala | 214 ++++++++++++++------- .../src/test/scala/kamon/spray/TestServer.scala | 4 +- 2 files changed, 149 insertions(+), 69 deletions(-) (limited to 'kamon-spray/src/test/scala') diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala index fbf69c8a..57f9ebe1 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala @@ -18,22 +18,23 @@ package kamon.spray import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.ActorSystem +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time.{ Millis, Seconds, Span } import org.scalatest.{ Matchers, WordSpecLike } import spray.httpx.RequestBuilding import spray.http.{ HttpResponse, HttpRequest } -import kamon.trace.{ SegmentMetricIdentity, TraceRecorder } +import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder } import com.typesafe.config.ConfigFactory import spray.can.Http import spray.http.HttpHeaders.RawHeader import kamon.Kamon import kamon.metric.{ TraceMetrics, Metrics } -import spray.client.pipelining +import spray.client.pipelining.sendReceive import kamon.metric.Subscriptions.TickMetricSnapshot import scala.concurrent.duration._ -import akka.pattern.pipe import kamon.metric.TraceMetrics.TraceMetricsSnapshot -class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer { +class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with ScalaFutures with RequestBuilding with TestServer { implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString( """ |akka { @@ -41,8 +42,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit |} | |kamon { + | spray { + | name-generator = kamon.spray.TestSprayNameGenerator + | } + | | metrics { - | tick-interval = 2 seconds + | tick-interval = 1 hour | | filters = [ | { @@ -57,19 +62,21 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit """.stripMargin)) implicit def ec = system.dispatcher + implicit val defaultPatience = PatienceConfig(timeout = Span(10, Seconds), interval = Span(5, Millis)) - "the client instrumentation" when { - "configured to do automatic-trace-token-propagation" should { - "include the trace token header on spray-client requests" in { + "the spray client instrumentation" when { + "using the request-level api" should { + "include the trace token header if automatic-trace-token-propagation is enabled" in { enableAutomaticTraceTokenPropagation() - - val (hostConnector, server) = buildSHostConnectorAndServer - val client = TestProbe() + val (_, server, bound) = buildSHostConnectorAndServer // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") { - client.send(hostConnector, Get("/dummy-path")) - TraceRecorder.currentContext + val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("include-trace-token-header-at-request-level-api") { + val rF = sendReceive(system, ec) { + Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path") + } + + (TraceRecorder.currentContext, rF) } // Accept the connection at the server side @@ -82,20 +89,105 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) - client.expectMsgType[HttpResponse] + responseFuture.futureValue.entity.asString should be("ok") testContext.finish() } - } - "not configured to do automatic-trace-token-propagation" should { - "not include the trace token header on spray-client requests" in { + "not include the trace token header if automatic-trace-token-propagation is disabled" in { disableAutomaticTraceTokenPropagation() + val (_, server, bound) = buildSHostConnectorAndServer + + // Initiate a request within the context of a trace + val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("do-not-include-trace-token-header-at-request-level-api") { + val rF = sendReceive(system, ec) { + Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path") + } + + (TraceRecorder.currentContext, rF) + } + + // Accept the connection at the server side + server.expectMsgType[Http.Connected] + server.reply(Http.Register(server.ref)) + + // Receive the request and reply back + val request = server.expectMsgType[HttpRequest] + request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + + // Finish the request cycle, just to avoid error messages on the logs. + server.reply(HttpResponse(entity = "ok")) + responseFuture.futureValue.entity.asString should be("ok") + testContext.finish() + } + + "start and finish a segment that must be named using the request level api name assignation" in { + enableAutomaticTraceTokenPropagation() + enablePipeliningSegmentCollectionStrategy() + + val transport = TestProbe() + val (_, _, bound) = buildSHostConnectorAndServer + + // Initiate a request within the context of a trace + val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("assign-name-to-segment-with-request-level-api") { + val rF = sendReceive(transport.ref)(ec, 10.seconds) { + Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment") + } + + (TraceRecorder.currentContext, rF) + } + + // Receive the request and reply back + transport.expectMsgType[HttpRequest] + transport.reply(HttpResponse(entity = "ok")) + responseFuture.futureValue.entity.asString should be("ok") + testContext.finish() + + val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api") + traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("request-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + } + + "rename a request level api segment once it reaches the relevant host connector" in { + enableAutomaticTraceTokenPropagation() + enablePipeliningSegmentCollectionStrategy() + + val (_, server, bound) = buildSHostConnectorAndServer + + // Initiate a request within the context of a trace + val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("rename-segment-with-request-level-api") { + val rF = sendReceive(system, ec) { + Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment") + } + + (TraceRecorder.currentContext, rF) + } - val (hostConnector, server) = buildSHostConnectorAndServer + // Accept the connection at the server side + server.expectMsgType[Http.Connected] + server.reply(Http.Register(server.ref)) + + // Receive the request and reply back + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "ok")) + responseFuture.futureValue.entity.asString should be("ok") + testContext.finish() + + val traceMetricsSnapshot = takeSnapshotOf("rename-segment-with-request-level-api") + traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + } + } + + "using the host-level api" should { + "include the trace token header on spray-client requests if automatic-trace-token-propagation is enabled" in { + enableAutomaticTraceTokenPropagation() + enableInternalSegmentCollectionStrategy() + + val (hostConnector, server, _) = buildSHostConnectorAndServer val client = TestProbe() // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") { + val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") { client.send(hostConnector, Get("/dummy-path")) TraceRecorder.currentContext } @@ -106,30 +198,24 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] testContext.finish() } - } - "configured to use pipelining segment collection strategy" should { - "open a segment when sendReceive is called and close it when the resulting Future[HttpResponse] is completed" in { - enablePipeliningSegmentCollectionStrategy() + "not include the trace token header on spray-client requests if automatic-trace-token-propagation is disabled" in { + disableAutomaticTraceTokenPropagation() + enableInternalSegmentCollectionStrategy() - val (hostConnector, server) = buildSHostConnectorAndServer + val (hostConnector, server, _) = buildSHostConnectorAndServer val client = TestProbe() - val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds) - - val metricListener = TestProbe() - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) - metricListener.expectMsgType[TickMetricSnapshot] // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("pipelining-strategy-client-request") { - pipeline(Get("/dummy-path")) to client.ref + val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") { + client.send(hostConnector, Get("/dummy-path")) TraceRecorder.currentContext } @@ -138,39 +224,25 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit server.reply(Http.Register(server.ref)) // Receive the request and reply back - val req = server.expectMsgType[HttpRequest] + val request = server.expectMsgType[HttpRequest] + request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + + // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] - - // Finish the trace testContext.finish() - - val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds) - traceMetrics.elapsedTime.numberOfMeasurements should be(1L) - traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) - recordedSegment should not be empty - recordedSegment map { segmentMetrics ⇒ - segmentMetrics.numberOfMeasurements should be(1L) - } } - } - "configured to use internal segment collection strategy" should { - "open a segment upon reception of a request by the HttpHostConnector and close it when sending the response" in { + "start and finish a segment that must be named using the host level api name assignation" in { + disableAutomaticTraceTokenPropagation() enableInternalSegmentCollectionStrategy() - val (hostConnector, server) = buildSHostConnectorAndServer + val (hostConnector, server, _) = buildSHostConnectorAndServer val client = TestProbe() - val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds) - - val metricListener = TestProbe() - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) - metricListener.expectMsgType[TickMetricSnapshot] // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("internal-strategy-client-request") { - pipeline(Get("/dummy-path")) to client.ref + val testContext = TraceRecorder.withNewTraceContext("create-segment-with-host-level-api") { + client.send(hostConnector, Get("/host-level-api-segment")) TraceRecorder.currentContext } @@ -179,21 +251,17 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit server.reply(Http.Register(server.ref)) // Receive the request and reply back - server.expectMsgType[HttpRequest] + val request = server.expectMsgType[HttpRequest] + request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + + // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] - - // Finish the trace testContext.finish() - val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds) - traceMetrics.elapsedTime.numberOfMeasurements should be(1L) - traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) - recordedSegment should not be empty - recordedSegment map { segmentMetrics ⇒ - segmentMetrics.numberOfMeasurements should be(1L) - } + val traceMetricsSnapshot = takeSnapshotOf("create-segment-with-host-level-api") + traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /host-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) } } } @@ -208,6 +276,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit metricsOption.get.asInstanceOf[TraceMetricsSnapshot] } + def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = { + val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory) + val collectionContext = Kamon(Metrics).buildDefaultCollectionContext + recorder.get.collect(collectionContext) + } + def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal) def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Pipelining) def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true) @@ -227,3 +301,9 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit field.set(target, include) } } + +class TestSprayNameGenerator extends SprayNameGenerator { + def generateTraceName(request: HttpRequest): String = request.uri.path.toString() + def generateRequestLevelApiSegmentName(request: HttpRequest): String = "request-level " + request.uri.path.toString() + def generateHostLevelApiSegmentName(request: HttpRequest): String = "host-level " + request.uri.path.toString() +} diff --git a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala index 65506770..379b8fc8 100644 --- a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala +++ b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala @@ -45,13 +45,13 @@ trait TestServer { probe.sender } - def buildSHostConnectorAndServer: (ActorRef, TestProbe) = { + def buildSHostConnectorAndServer: (ActorRef, TestProbe, Http.Bound) = { val serverHandler = TestProbe() IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref) val bound = serverHandler.expectMsgType[Bound](10 seconds) val client = httpHostConnector(bound) - (client, serverHandler) + (client, serverHandler, bound) } private def httpHostConnector(connectionInfo: Http.Bound): ActorRef = { -- cgit v1.2.3