From 89d3057f8025add4b94b32c142e220ffb79f6c33 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 31 Oct 2014 02:45:41 +0100 Subject: + spray: external naming for traces and segments, related to #65 --- .../akka/ActorCellInstrumentation.scala | 8 +- .../src/main/scala/kamon/trace/TraceContext.scala | 16 +- kamon-spray/src/main/resources/reference.conf | 3 + kamon-spray/src/main/scala/kamon/spray/Spray.scala | 20 +- .../can/client/ClientRequestInstrumentation.scala | 121 +++++++----- .../can/server/ServerRequestInstrumentation.scala | 2 +- .../spray/ClientRequestInstrumentationSpec.scala | 214 ++++++++++++++------- .../src/test/scala/kamon/spray/TestServer.scala | 4 +- 8 files changed, 255 insertions(+), 133 deletions(-) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala index 9b541a32..90928ba0 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala @@ -59,7 +59,7 @@ class ActorCellInstrumentation { def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] + val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] try { TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { @@ -154,13 +154,13 @@ class ActorCellMetricsIntoActorCellMixin { class TraceContextIntoEnvelopeMixin { @DeclareMixin("akka.dispatch.Envelope") - def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default + def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} + def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {} @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { + def afterEnvelopeCreation(ctx: TimestampedTraceContextAware): Unit = { // Necessary to force the initialization of ContextAware at the moment of creation. ctx.traceContext } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 08289acf..c4c28a68 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -129,7 +129,7 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, def finish: Unit = { val segmentFinishNanoTime = System.nanoTime() - finishSegment(segmentName, label, (segmentFinishNanoTime - _segmentStartNanoTime)) + finishSegment(name, label, (segmentFinishNanoTime - _segmentStartNanoTime)) } } } @@ -155,7 +155,6 @@ object TraceContextOrigin { } trait TraceContextAware extends Serializable { - def captureNanoTime: Long def traceContext: TraceContext } @@ -163,7 +162,6 @@ object TraceContextAware { def default: TraceContextAware = new DefaultTraceContextAware class DefaultTraceContextAware extends TraceContextAware { - @transient val captureNanoTime = System.nanoTime() @transient val traceContext = TraceRecorder.currentContext // @@ -180,7 +178,17 @@ object TraceContextAware { } } -trait SegmentAware extends TraceContextAware { +trait TimestampedTraceContextAware extends TraceContextAware { + def captureNanoTime: Long +} + +object TimestampedTraceContextAware { + def default: TimestampedTraceContextAware = new DefaultTraceContextAware with TimestampedTraceContextAware { + @transient val captureNanoTime = System.nanoTime() + } +} + +trait SegmentAware { @volatile var segment: Segment = EmptyTraceContext.EmptySegment } diff --git a/kamon-spray/src/main/resources/reference.conf b/kamon-spray/src/main/resources/reference.conf index d497e681..9fed5a2b 100644 --- a/kamon-spray/src/main/resources/reference.conf +++ b/kamon-spray/src/main/resources/reference.conf @@ -16,6 +16,9 @@ kamon { # in the `HttpRequest` headers. automatic-trace-token-propagation = true + # Fully qualified name of the implementation of kamon.spray.SprayNameGenerator that will be used for assigning names + # to traces and client http segments. + name-generator = kamon.spray.SimpleSprayNameGenerator client { # Strategy used for automatic trace segment generation when issue requests with spray-client. The possible values diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala index 76adb214..4a0fd74e 100644 --- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala +++ b/kamon-spray/src/main/scala/kamon/spray/Spray.scala @@ -43,6 +43,9 @@ class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Exte val httpServerMetrics = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get // It's safe to assume that HttpServerMetrics will always exist because there is no particular filter for it. + private val nameGeneratorFQN = config.getString("name-generator") + private val nameGenerator: SprayNameGenerator = system.dynamicAccess.createInstanceFor[SprayNameGenerator](nameGeneratorFQN, Nil).get // let's bubble up any problems. + val clientSegmentCollectionStrategy: ClientSegmentCollectionStrategy.Strategy = config.getString("client.segment-collection-strategy") match { case "pipelining" ⇒ ClientSegmentCollectionStrategy.Pipelining @@ -51,6 +54,19 @@ class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Exte s"only pipelining and internal are valid options.") } - // Later we should expose a way for the user to customize this. - def assignHttpClientRequestName(request: HttpRequest): String = request.uri.authority.host.address + def generateTraceName(request: HttpRequest): String = nameGenerator.generateTraceName(request) + def generateRequestLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateRequestLevelApiSegmentName(request) + def generateHostLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateHostLevelApiSegmentName(request) +} + +trait SprayNameGenerator { + def generateTraceName(request: HttpRequest): String + def generateRequestLevelApiSegmentName(request: HttpRequest): String + def generateHostLevelApiSegmentName(request: HttpRequest): String +} + +class SimpleSprayNameGenerator extends SprayNameGenerator { + def generateRequestLevelApiSegmentName(request: HttpRequest): String = request.method.value + ": " + request.uri.path + def generateTraceName(request: HttpRequest): String = request.method.value + ": " + request.uri.path + def generateHostLevelApiSegmentName(request: HttpRequest): String = request.uri.authority.host.address } diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala index cfd204df..94fc3572 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala @@ -18,7 +18,7 @@ package spray.can.client import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest } +import spray.http._ import spray.http.HttpHeaders.RawHeader import kamon.trace._ import kamon.Kamon @@ -31,60 +31,79 @@ import akka.util.Timeout class ClientRequestInstrumentation { @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") - def mixin: SegmentAware = SegmentAware.default + def mixinTraceContextAwareToRequestContext: TraceContextAware = TraceContextAware.default + + @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") + def mixinSegmentAwareToRequestContext: SegmentAware = SegmentAware.default + + @DeclareMixin("spray.http.HttpRequest") + def mixinSegmentAwareToHttpRequest: SegmentAware = SegmentAware.default @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)") - def requestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {} + def requestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {} @After("requestContextCreation(requestContext, request)") - def afterRequestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = { - // The RequestContext will be copied when a request needs to be retried but we are only interested in creating the - // segment the first time we create one. + def afterRequestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = { + // This read to requestContext.traceContext takes care of initializing the aspect timely. + requestContext.traceContext - // The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely. - if (requestContext.segment.isEmpty) { - TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ - val sprayExtension = Kamon(Spray)(ctx.system) + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + val sprayExtension = Kamon(Spray)(system) - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { - val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { + if (requestContext.segment.isEmpty) { + val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) + val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + requestContext.segment = segment + } - requestContext.segment = segment - } + } else { - case EmptyTraceContext ⇒ // Nothing to do here. + // We have a Request Level API, let's just make sure that we rename it accordingly. The reason for assigning a + // name again here is that when the request was initially sent it might not have the Host information available + // and it might be important to decide a proper segment name. + + val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) + request.asInstanceOf[SegmentAware].segment.rename(clientRequestName) } } } @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)") - def copyingRequestContext(old: SegmentAware): Unit = {} + def copyingRequestContext(old: TraceContextAware): Unit = {} @Around("copyingRequestContext(old)") - def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: SegmentAware): Any = { + def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TraceContextAware): Any = { TraceRecorder.withInlineTraceContextReplacement(old.traceContext) { pjp.proceed() } } @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)") - def dispatchToCommander(requestContext: SegmentAware, message: Any): Unit = {} + def dispatchToCommander(requestContext: TraceContextAware, message: Any): Unit = {} @Around("dispatchToCommander(requestContext, message)") - def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentAware, message: Any) = { + def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = { if (requestContext.traceContext.nonEmpty) { TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) { if (message.isInstanceOf[HttpMessageEnd]) - requestContext.segment.finish() + requestContext.asInstanceOf[SegmentAware].segment.finish() pjp.proceed() } - } else pjp.proceed() } + @Pointcut("execution(* spray.http.HttpRequest.copy(..)) && this(old)") + def copyingHttpRequest(old: SegmentAware): Unit = {} + + @Around("copyingHttpRequest(old)") + def aroundCopyingHttpRequest(pjp: ProceedingJoinPoint, old: SegmentAware): Any = { + val copiedHttpRequest = pjp.proceed().asInstanceOf[SegmentAware] + copiedHttpRequest.segment = old.segment + copiedHttpRequest + } + @Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)") def requestLevelApiSendReceive(transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Unit = {} @@ -93,46 +112,42 @@ class ClientRequestInstrumentation { val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]] (request: HttpRequest) ⇒ { - val responseFuture = originalSendReceive.apply(request) - - TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ - val sprayExtension = Kamon(Spray)(ctx.system) + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + val sprayExtension = Kamon(Spray)(system) + val segment = + if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) + ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentMetricIdentityLabel.HttpClient) + else + EmptyTraceContext.EmptySegment - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) { - val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + request.asInstanceOf[SegmentAware].segment = segment - responseFuture.onComplete { result ⇒ - segment.finish() - }(ec) - } + val responseFuture = originalSendReceive.apply(request) + responseFuture.onComplete { result ⇒ + segment.finish() + }(ec) - case EmptyTraceContext ⇒ // Nothing to do here. - } + responseFuture - responseFuture + } getOrElse (originalSendReceive.apply(request)) } - } - @Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)") - def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {} + @Pointcut("execution(* spray.http.HttpMessage.withDefaultHeaders(*)) && this(request) && args(defaultHeaders)") + def includingDefaultHeadersAtHttpHostConnector(request: HttpMessage, defaultHeaders: List[HttpHeader]): Unit = {} - @Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)") - def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = { - val modifiedHeaders = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ - val sprayExtension = Kamon(Spray)(ctx.system) + @Around("includingDefaultHeadersAtHttpHostConnector(request, defaultHeaders)") + def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, request: HttpMessage, defaultHeaders: List[HttpHeader]): Any = { - if (sprayExtension.includeTraceToken) - RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders - else - defaultHeaders + val modifiedHeaders = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + val sprayExtension = Kamon(Spray)(system) + if (sprayExtension.includeTraceToken) + RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders + else + defaultHeaders - case EmptyTraceContext ⇒ defaultHeaders - } + } getOrElse (defaultHeaders) - pjp.proceed(Array(modifiedHeaders)) + pjp.proceed(Array[AnyRef](request, modifiedHeaders)) } } diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala index 74d98564..eb25412b 100644 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala @@ -39,7 +39,7 @@ class ServerRequestInstrumentation { val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system val sprayExtension = Kamon(Spray)(system) - val defaultTraceName: String = request.method.value + ": " + request.uri.path + val defaultTraceName = sprayExtension.generateTraceName(request) val token = if (sprayExtension.includeTraceToken) { request.headers.find(_.name == sprayExtension.traceTokenHeaderName).map(_.value) } else None diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala index fbf69c8a..57f9ebe1 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala @@ -18,22 +18,23 @@ package kamon.spray import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.ActorSystem +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time.{ Millis, Seconds, Span } import org.scalatest.{ Matchers, WordSpecLike } import spray.httpx.RequestBuilding import spray.http.{ HttpResponse, HttpRequest } -import kamon.trace.{ SegmentMetricIdentity, TraceRecorder } +import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder } import com.typesafe.config.ConfigFactory import spray.can.Http import spray.http.HttpHeaders.RawHeader import kamon.Kamon import kamon.metric.{ TraceMetrics, Metrics } -import spray.client.pipelining +import spray.client.pipelining.sendReceive import kamon.metric.Subscriptions.TickMetricSnapshot import scala.concurrent.duration._ -import akka.pattern.pipe import kamon.metric.TraceMetrics.TraceMetricsSnapshot -class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer { +class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with ScalaFutures with RequestBuilding with TestServer { implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString( """ |akka { @@ -41,8 +42,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit |} | |kamon { + | spray { + | name-generator = kamon.spray.TestSprayNameGenerator + | } + | | metrics { - | tick-interval = 2 seconds + | tick-interval = 1 hour | | filters = [ | { @@ -57,19 +62,21 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit """.stripMargin)) implicit def ec = system.dispatcher + implicit val defaultPatience = PatienceConfig(timeout = Span(10, Seconds), interval = Span(5, Millis)) - "the client instrumentation" when { - "configured to do automatic-trace-token-propagation" should { - "include the trace token header on spray-client requests" in { + "the spray client instrumentation" when { + "using the request-level api" should { + "include the trace token header if automatic-trace-token-propagation is enabled" in { enableAutomaticTraceTokenPropagation() - - val (hostConnector, server) = buildSHostConnectorAndServer - val client = TestProbe() + val (_, server, bound) = buildSHostConnectorAndServer // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") { - client.send(hostConnector, Get("/dummy-path")) - TraceRecorder.currentContext + val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("include-trace-token-header-at-request-level-api") { + val rF = sendReceive(system, ec) { + Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path") + } + + (TraceRecorder.currentContext, rF) } // Accept the connection at the server side @@ -82,20 +89,105 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) - client.expectMsgType[HttpResponse] + responseFuture.futureValue.entity.asString should be("ok") testContext.finish() } - } - "not configured to do automatic-trace-token-propagation" should { - "not include the trace token header on spray-client requests" in { + "not include the trace token header if automatic-trace-token-propagation is disabled" in { disableAutomaticTraceTokenPropagation() + val (_, server, bound) = buildSHostConnectorAndServer + + // Initiate a request within the context of a trace + val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("do-not-include-trace-token-header-at-request-level-api") { + val rF = sendReceive(system, ec) { + Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path") + } + + (TraceRecorder.currentContext, rF) + } + + // Accept the connection at the server side + server.expectMsgType[Http.Connected] + server.reply(Http.Register(server.ref)) + + // Receive the request and reply back + val request = server.expectMsgType[HttpRequest] + request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + + // Finish the request cycle, just to avoid error messages on the logs. + server.reply(HttpResponse(entity = "ok")) + responseFuture.futureValue.entity.asString should be("ok") + testContext.finish() + } + + "start and finish a segment that must be named using the request level api name assignation" in { + enableAutomaticTraceTokenPropagation() + enablePipeliningSegmentCollectionStrategy() + + val transport = TestProbe() + val (_, _, bound) = buildSHostConnectorAndServer + + // Initiate a request within the context of a trace + val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("assign-name-to-segment-with-request-level-api") { + val rF = sendReceive(transport.ref)(ec, 10.seconds) { + Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment") + } + + (TraceRecorder.currentContext, rF) + } + + // Receive the request and reply back + transport.expectMsgType[HttpRequest] + transport.reply(HttpResponse(entity = "ok")) + responseFuture.futureValue.entity.asString should be("ok") + testContext.finish() + + val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api") + traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("request-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + } + + "rename a request level api segment once it reaches the relevant host connector" in { + enableAutomaticTraceTokenPropagation() + enablePipeliningSegmentCollectionStrategy() + + val (_, server, bound) = buildSHostConnectorAndServer + + // Initiate a request within the context of a trace + val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("rename-segment-with-request-level-api") { + val rF = sendReceive(system, ec) { + Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment") + } + + (TraceRecorder.currentContext, rF) + } - val (hostConnector, server) = buildSHostConnectorAndServer + // Accept the connection at the server side + server.expectMsgType[Http.Connected] + server.reply(Http.Register(server.ref)) + + // Receive the request and reply back + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "ok")) + responseFuture.futureValue.entity.asString should be("ok") + testContext.finish() + + val traceMetricsSnapshot = takeSnapshotOf("rename-segment-with-request-level-api") + traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) + } + } + + "using the host-level api" should { + "include the trace token header on spray-client requests if automatic-trace-token-propagation is enabled" in { + enableAutomaticTraceTokenPropagation() + enableInternalSegmentCollectionStrategy() + + val (hostConnector, server, _) = buildSHostConnectorAndServer val client = TestProbe() // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") { + val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") { client.send(hostConnector, Get("/dummy-path")) TraceRecorder.currentContext } @@ -106,30 +198,24 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] testContext.finish() } - } - "configured to use pipelining segment collection strategy" should { - "open a segment when sendReceive is called and close it when the resulting Future[HttpResponse] is completed" in { - enablePipeliningSegmentCollectionStrategy() + "not include the trace token header on spray-client requests if automatic-trace-token-propagation is disabled" in { + disableAutomaticTraceTokenPropagation() + enableInternalSegmentCollectionStrategy() - val (hostConnector, server) = buildSHostConnectorAndServer + val (hostConnector, server, _) = buildSHostConnectorAndServer val client = TestProbe() - val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds) - - val metricListener = TestProbe() - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) - metricListener.expectMsgType[TickMetricSnapshot] // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("pipelining-strategy-client-request") { - pipeline(Get("/dummy-path")) to client.ref + val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") { + client.send(hostConnector, Get("/dummy-path")) TraceRecorder.currentContext } @@ -138,39 +224,25 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit server.reply(Http.Register(server.ref)) // Receive the request and reply back - val req = server.expectMsgType[HttpRequest] + val request = server.expectMsgType[HttpRequest] + request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + + // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] - - // Finish the trace testContext.finish() - - val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds) - traceMetrics.elapsedTime.numberOfMeasurements should be(1L) - traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) - recordedSegment should not be empty - recordedSegment map { segmentMetrics ⇒ - segmentMetrics.numberOfMeasurements should be(1L) - } } - } - "configured to use internal segment collection strategy" should { - "open a segment upon reception of a request by the HttpHostConnector and close it when sending the response" in { + "start and finish a segment that must be named using the host level api name assignation" in { + disableAutomaticTraceTokenPropagation() enableInternalSegmentCollectionStrategy() - val (hostConnector, server) = buildSHostConnectorAndServer + val (hostConnector, server, _) = buildSHostConnectorAndServer val client = TestProbe() - val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds) - - val metricListener = TestProbe() - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) - metricListener.expectMsgType[TickMetricSnapshot] // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("internal-strategy-client-request") { - pipeline(Get("/dummy-path")) to client.ref + val testContext = TraceRecorder.withNewTraceContext("create-segment-with-host-level-api") { + client.send(hostConnector, Get("/host-level-api-segment")) TraceRecorder.currentContext } @@ -179,21 +251,17 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit server.reply(Http.Register(server.ref)) // Receive the request and reply back - server.expectMsgType[HttpRequest] + val request = server.expectMsgType[HttpRequest] + request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + + // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] - - // Finish the trace testContext.finish() - val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds) - traceMetrics.elapsedTime.numberOfMeasurements should be(1L) - traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) - recordedSegment should not be empty - recordedSegment map { segmentMetrics ⇒ - segmentMetrics.numberOfMeasurements should be(1L) - } + val traceMetricsSnapshot = takeSnapshotOf("create-segment-with-host-level-api") + traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) + traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /host-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) } } } @@ -208,6 +276,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit metricsOption.get.asInstanceOf[TraceMetricsSnapshot] } + def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = { + val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory) + val collectionContext = Kamon(Metrics).buildDefaultCollectionContext + recorder.get.collect(collectionContext) + } + def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal) def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Pipelining) def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true) @@ -227,3 +301,9 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit field.set(target, include) } } + +class TestSprayNameGenerator extends SprayNameGenerator { + def generateTraceName(request: HttpRequest): String = request.uri.path.toString() + def generateRequestLevelApiSegmentName(request: HttpRequest): String = "request-level " + request.uri.path.toString() + def generateHostLevelApiSegmentName(request: HttpRequest): String = "host-level " + request.uri.path.toString() +} diff --git a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala index 65506770..379b8fc8 100644 --- a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala +++ b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala @@ -45,13 +45,13 @@ trait TestServer { probe.sender } - def buildSHostConnectorAndServer: (ActorRef, TestProbe) = { + def buildSHostConnectorAndServer: (ActorRef, TestProbe, Http.Bound) = { val serverHandler = TestProbe() IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref) val bound = serverHandler.expectMsgType[Bound](10 seconds) val client = httpHostConnector(bound) - (client, serverHandler) + (client, serverHandler, bound) } private def httpHostConnector(connectionInfo: Http.Bound): ActorRef = { -- cgit v1.2.3