From 01a34f67ff75419c440f2e69c0a0db888a670a34 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 12 Jan 2015 01:45:27 +0100 Subject: ! all: improve the metric recorders infrastructure --- kamon-spray/src/main/resources/META-INF/aop.xml | 6 +- kamon-spray/src/main/resources/reference.conf | 7 +- .../scala/kamon/spray/KamonTraceDirectives.scala | 4 +- kamon-spray/src/main/scala/kamon/spray/Spray.scala | 81 ----------- .../main/scala/kamon/spray/SprayExtension.scala | 88 ++++++++++++ .../scala/kamon/spray/SprayExtensionSettings.scala | 35 +++++ .../ClientRequestInstrumentation.scala | 152 ++++++++++++++++++++ .../ServerRequestInstrumentation.scala | 136 ++++++++++++++++++ .../can/client/ClientRequestInstrumentation.scala | 153 --------------------- .../can/server/ServerRequestInstrumentation.scala | 136 ------------------ kamon-spray/src/test/resources/application.conf | 25 ---- .../spray/ClientRequestInstrumentationSpec.scala | 147 ++++++++------------ .../scala/kamon/spray/SprayServerMetricsSpec.scala | 67 ++++----- .../scala/kamon/spray/SprayServerTracingSpec.scala | 51 ++----- 14 files changed, 521 insertions(+), 567 deletions(-) delete mode 100644 kamon-spray/src/main/scala/kamon/spray/Spray.scala create mode 100644 kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala create mode 100644 kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala create mode 100644 kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala create mode 100644 kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala delete mode 100644 kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala delete mode 100644 kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala (limited to 'kamon-spray') diff --git a/kamon-spray/src/main/resources/META-INF/aop.xml b/kamon-spray/src/main/resources/META-INF/aop.xml index 0e5726c6..00e8763a 100644 --- a/kamon-spray/src/main/resources/META-INF/aop.xml +++ b/kamon-spray/src/main/resources/META-INF/aop.xml @@ -2,14 +2,16 @@ + - + + - + diff --git a/kamon-spray/src/main/resources/reference.conf b/kamon-spray/src/main/resources/reference.conf index 5c5e9317..bdba21cb 100644 --- a/kamon-spray/src/main/resources/reference.conf +++ b/kamon-spray/src/main/resources/reference.conf @@ -4,6 +4,7 @@ kamon { spray { + # Header name used when propagating the `TraceContext.token` value across applications. trace-token-header-name = "X-Trace-Token" @@ -23,16 +24,16 @@ kamon { client { # Strategy used for automatic trace segment generation when issue requests with spray-client. The possible values # are: - # - pipelining: measures the time during which the user application code is waiting for a spray-client request to + # - request-level: measures the time during which the user application code is waiting for a spray-client request to # complete, by attaching a callback to the Future[HttpResponse] returned by `spray.client.pipelining.sendReceive`. # If `spray.client.pipelining.sendReceive` is not used, the segment measurement wont be performed. - # - internal: measures the internal time taken by spray-client to finish a request. Sometimes the user application + # - host-level: measures the internal time taken by spray-client to finish a request. Sometimes the user application # code has a finite future timeout (like when using `spray.client.pipelining.sendReceive`) that doesn't match # the actual amount of time spray might take internally to resolve a request, counting retries, redirects, # connection timeouts and so on. If using the internal strategy, the measured time will include the entire time # since the request has been received by the corresponding `HttpHostConnector` until a response is sent back # to the requester. - segment-collection-strategy = pipelining + instrumentation-level = request-level } } } \ No newline at end of file diff --git a/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala b/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala index e98b63d9..4eefee95 100644 --- a/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala +++ b/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala @@ -17,11 +17,11 @@ package kamon.spray import spray.routing.directives.BasicDirectives import spray.routing._ -import kamon.trace.TraceRecorder +import kamon.trace.TraceContext trait KamonTraceDirectives extends BasicDirectives { def traceName(name: String): Directive0 = mapRequest { req ⇒ - TraceRecorder.rename(name) + TraceContext.currentContext.rename(name) req } } diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala deleted file mode 100644 index ab8d6a7d..00000000 --- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.spray - -import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } -import akka.actor -import kamon.Kamon -import kamon.http.HttpServerMetrics -import kamon.metric.Metrics -import spray.http.HttpHeaders.Host -import spray.http.HttpRequest - -object Spray extends ExtensionId[SprayExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Spray - def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtension(system) - - val SegmentLibraryName = "spray-client" -} - -object ClientSegmentCollectionStrategy { - sealed trait Strategy - case object Pipelining extends Strategy - case object Internal extends Strategy -} - -class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { - private val config = system.settings.config.getConfig("kamon.spray") - - val includeTraceToken: Boolean = config.getBoolean("automatic-trace-token-propagation") - val traceTokenHeaderName: String = config.getString("trace-token-header-name") - val httpServerMetrics = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get - // It's safe to assume that HttpServerMetrics will always exist because there is no particular filter for it. - - private val nameGeneratorFQN = config.getString("name-generator") - private val nameGenerator: SprayNameGenerator = system.dynamicAccess.createInstanceFor[SprayNameGenerator](nameGeneratorFQN, Nil).get // let's bubble up any problems. - - val clientSegmentCollectionStrategy: ClientSegmentCollectionStrategy.Strategy = - config.getString("client.segment-collection-strategy") match { - case "pipelining" ⇒ ClientSegmentCollectionStrategy.Pipelining - case "internal" ⇒ ClientSegmentCollectionStrategy.Internal - case other ⇒ throw new IllegalArgumentException(s"Configured segment-collection-strategy [$other] is invalid, " + - s"only pipelining and internal are valid options.") - } - - def generateTraceName(request: HttpRequest): String = nameGenerator.generateTraceName(request) - def generateRequestLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateRequestLevelApiSegmentName(request) - def generateHostLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateHostLevelApiSegmentName(request) -} - -trait SprayNameGenerator { - def generateTraceName(request: HttpRequest): String - def generateRequestLevelApiSegmentName(request: HttpRequest): String - def generateHostLevelApiSegmentName(request: HttpRequest): String -} - -class DefaultSprayNameGenerator extends SprayNameGenerator { - def hostFromHeaders(request: HttpRequest): Option[String] = request.header[Host].map(_.host) - - def generateRequestLevelApiSegmentName(request: HttpRequest): String = { - val uriAddress = request.uri.authority.host.address - if (uriAddress.equals("")) hostFromHeaders(request).getOrElse("unknown-host") else uriAddress - } - - def generateHostLevelApiSegmentName(request: HttpRequest): String = hostFromHeaders(request).getOrElse("unknown-host") - - def generateTraceName(request: HttpRequest): String = request.method.value + ": " + request.uri.path -} diff --git a/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala b/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala new file mode 100644 index 00000000..3df8d972 --- /dev/null +++ b/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala @@ -0,0 +1,88 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.spray + +import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } +import akka.actor +import akka.event.{ Logging, LoggingAdapter } +import kamon.Kamon +import kamon.http.HttpServerMetrics +import kamon.metric.{ Entity, Metrics } +import spray.http.HttpHeaders.Host +import spray.http.HttpRequest + +object Spray extends ExtensionId[SprayExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: actor.Extension] = Spray + def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtensionImpl(system) + + val SegmentLibraryName = "spray-client" +} + +trait SprayExtension extends Kamon.Extension { + def settings: SprayExtensionSettings + def log: LoggingAdapter + def httpServerMetrics: HttpServerMetrics + def generateTraceName(request: HttpRequest): String + def generateRequestLevelApiSegmentName(request: HttpRequest): String + def generateHostLevelApiSegmentName(request: HttpRequest): String +} + +class SprayExtensionImpl(system: ExtendedActorSystem) extends SprayExtension { + val settings = SprayExtensionSettings(system) + val log = Logging(system, "SprayExtension") + + val httpServerMetrics = { + val metricsExtension = Metrics.get(system) + val factory = metricsExtension.instrumentFactory(HttpServerMetrics.category) + val entity = Entity("spray-server", HttpServerMetrics.category) + + Metrics.get(system).register(entity, new HttpServerMetrics(factory)).recorder + } + + def generateTraceName(request: HttpRequest): String = + settings.nameGenerator.generateTraceName(request) + + def generateRequestLevelApiSegmentName(request: HttpRequest): String = + settings.nameGenerator.generateRequestLevelApiSegmentName(request) + + def generateHostLevelApiSegmentName(request: HttpRequest): String = + settings.nameGenerator.generateHostLevelApiSegmentName(request) +} + +trait SprayNameGenerator { + def generateTraceName(request: HttpRequest): String + def generateRequestLevelApiSegmentName(request: HttpRequest): String + def generateHostLevelApiSegmentName(request: HttpRequest): String +} + +class DefaultSprayNameGenerator extends SprayNameGenerator { + + def generateRequestLevelApiSegmentName(request: HttpRequest): String = { + val uriAddress = request.uri.authority.host.address + if (uriAddress.equals("")) hostFromHeaders(request).getOrElse("unknown-host") else uriAddress + } + + def generateHostLevelApiSegmentName(request: HttpRequest): String = + hostFromHeaders(request).getOrElse("unknown-host") + + def generateTraceName(request: HttpRequest): String = + request.method.value + ": " + request.uri.path + + private def hostFromHeaders(request: HttpRequest): Option[String] = + request.header[Host].map(_.host) + +} diff --git a/kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala b/kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala new file mode 100644 index 00000000..44c71eaf --- /dev/null +++ b/kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala @@ -0,0 +1,35 @@ +package kamon.spray + +import akka.actor.ExtendedActorSystem + +case class SprayExtensionSettings( + includeTraceTokenHeader: Boolean, + traceTokenHeaderName: String, + nameGenerator: SprayNameGenerator, + clientInstrumentationLevel: ClientInstrumentationLevel.Level) + +object SprayExtensionSettings { + def apply(system: ExtendedActorSystem): SprayExtensionSettings = { + val config = system.settings.config.getConfig("kamon.spray") + + val includeTraceTokenHeader: Boolean = config.getBoolean("automatic-trace-token-propagation") + val traceTokenHeaderName: String = config.getString("trace-token-header-name") + + val nameGeneratorFQN = config.getString("name-generator") + val nameGenerator: SprayNameGenerator = system.dynamicAccess.createInstanceFor[SprayNameGenerator](nameGeneratorFQN, Nil).get // let's bubble up any problems. + + val clientInstrumentationLevel: ClientInstrumentationLevel.Level = config.getString("client.instrumentation-level") match { + case "request-level" ⇒ ClientInstrumentationLevel.RequestLevelAPI + case "host-level" ⇒ ClientInstrumentationLevel.HostLevelAPI + case other ⇒ sys.error(s"Invalid client instrumentation level [$other] found in configuration.") + } + + SprayExtensionSettings(includeTraceTokenHeader, traceTokenHeaderName, nameGenerator, clientInstrumentationLevel) + } +} + +object ClientInstrumentationLevel { + sealed trait Level + case object RequestLevelAPI extends Level + case object HostLevelAPI extends Level +} diff --git a/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala new file mode 100644 index 00000000..fa9063ad --- /dev/null +++ b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala @@ -0,0 +1,152 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package spray.can.client + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import spray.http._ +import spray.http.HttpHeaders.RawHeader +import kamon.trace._ +import kamon.spray.{ ClientInstrumentationLevel, Spray } +import akka.actor.ActorRef +import scala.concurrent.{ Future, ExecutionContext } +import akka.util.Timeout + +@Aspect +class ClientRequestInstrumentation { + + @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") + def mixinTraceContextAwareToRequestContext: TraceContextAware = TraceContextAware.default + + @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") + def mixinSegmentAwareToRequestContext: SegmentAware = SegmentAware.default + + @DeclareMixin("spray.http.HttpRequest") + def mixinSegmentAwareToHttpRequest: SegmentAware = SegmentAware.default + + @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)") + def requestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {} + + @After("requestContextCreation(requestContext, request)") + def afterRequestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = { + // This read to requestContext.traceContext takes care of initializing the aspect timely. + requestContext.traceContext + + TraceContext.map { ctx ⇒ + val sprayExtension = ctx.lookupExtension(Spray) + + if (sprayExtension.settings.clientInstrumentationLevel == ClientInstrumentationLevel.HostLevelAPI) { + if (requestContext.segment.isEmpty) { + val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) + val segment = ctx.startSegment(clientRequestName, SegmentCategory.HttpClient, Spray.SegmentLibraryName) + requestContext.segment = segment + } + + } else { + + // We have a Request Level API, let's just make sure that we rename it accordingly. The reason for assigning a + // name again here is that when the request was initially sent it might not have the Host information available + // and it might be important to decide a proper segment name. + + val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) + request.asInstanceOf[SegmentAware].segment.rename(clientRequestName) + } + } + } + + @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)") + def copyingRequestContext(old: TraceContextAware): Unit = {} + + @Around("copyingRequestContext(old)") + def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TraceContextAware): Any = { + TraceContext.withContext(old.traceContext) { + pjp.proceed() + } + } + + @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)") + def dispatchToCommander(requestContext: TraceContextAware, message: Any): Unit = {} + + @Around("dispatchToCommander(requestContext, message)") + def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = { + if (requestContext.traceContext.nonEmpty) { + TraceContext.withContext(requestContext.traceContext) { + if (message.isInstanceOf[HttpMessageEnd]) + requestContext.asInstanceOf[SegmentAware].segment.finish() + + pjp.proceed() + } + } else pjp.proceed() + } + + @Pointcut("execution(* spray.http.HttpRequest.copy(..)) && this(old)") + def copyingHttpRequest(old: SegmentAware): Unit = {} + + @Around("copyingHttpRequest(old)") + def aroundCopyingHttpRequest(pjp: ProceedingJoinPoint, old: SegmentAware): Any = { + val copiedHttpRequest = pjp.proceed().asInstanceOf[SegmentAware] + copiedHttpRequest.segment = old.segment + copiedHttpRequest + } + + @Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)") + def requestLevelApiSendReceive(transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Unit = {} + + @Around("requestLevelApiSendReceive(transport, ec, timeout)") + def aroundRequestLevelApiSendReceive(pjp: ProceedingJoinPoint, transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Any = { + val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]] + + (request: HttpRequest) ⇒ { + TraceContext.map { ctx ⇒ + val sprayExtension = ctx.lookupExtension(Spray) + val segment = + if (sprayExtension.settings.clientInstrumentationLevel == ClientInstrumentationLevel.RequestLevelAPI) + ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentCategory.HttpClient, Spray.SegmentLibraryName) + else + EmptyTraceContext.EmptySegment + + request.asInstanceOf[SegmentAware].segment = segment + + val responseFuture = originalSendReceive.apply(request) + responseFuture.onComplete { result ⇒ + segment.finish() + }(ec) + + responseFuture + + } getOrElse (originalSendReceive.apply(request)) + } + } + + @Pointcut("execution(* spray.http.HttpMessage.withDefaultHeaders(*)) && this(request) && args(defaultHeaders)") + def includingDefaultHeadersAtHttpHostConnector(request: HttpMessage, defaultHeaders: List[HttpHeader]): Unit = {} + + @Around("includingDefaultHeadersAtHttpHostConnector(request, defaultHeaders)") + def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, request: HttpMessage, defaultHeaders: List[HttpHeader]): Any = { + + val modifiedHeaders = TraceContext.map { ctx ⇒ + val sprayExtension = ctx.lookupExtension(Spray) + if (sprayExtension.settings.includeTraceTokenHeader) + RawHeader(sprayExtension.settings.traceTokenHeaderName, ctx.token) :: defaultHeaders + else + defaultHeaders + + } getOrElse (defaultHeaders) + + pjp.proceed(Array[AnyRef](request, modifiedHeaders)) + } +} \ No newline at end of file diff --git a/kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala new file mode 100644 index 00000000..73287132 --- /dev/null +++ b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala @@ -0,0 +1,136 @@ +/* =================================================== + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package spray.can.server.instrumentation + +import kamon.trace.TraceLocal.{ HttpContext, HttpContextKey } +import org.aspectj.lang.annotation._ +import kamon.trace._ +import akka.actor.ActorSystem +import spray.can.server.OpenRequest +import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest } +import kamon.Kamon +import kamon.spray.{ SprayExtension, Spray } +import org.aspectj.lang.ProceedingJoinPoint +import spray.http.HttpHeaders.RawHeader + +@Aspect +class ServerRequestInstrumentation { + + import ServerRequestInstrumentation._ + + @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") + def mixinContextAwareToOpenRequest: TraceContextAware = TraceContextAware.default + + @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)") + def openRequestInit(openRequest: TraceContextAware, request: HttpRequest): Unit = {} + + @After("openRequestInit(openRequest, request)") + def afterInit(openRequest: TraceContextAware, request: HttpRequest): Unit = { + val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system + val tracer = Tracer.get(system) + val sprayExtension = Kamon(Spray)(system) + + val defaultTraceName = sprayExtension.generateTraceName(request) + val token = if (sprayExtension.settings.includeTraceTokenHeader) { + request.headers.find(_.name == sprayExtension.settings.traceTokenHeaderName).map(_.value) + } else None + + val newContext = token.map(customToken ⇒ tracer.newContext(defaultTraceName, customToken)) getOrElse (tracer.newContext(defaultTraceName)) + TraceContext.setCurrentContext(newContext) + + // Necessary to force initialization of traceContext when initiating the request. + openRequest.traceContext + } + + @Pointcut("execution(* spray.can.server.ServerFrontend$$anon$2$$anon$1.spray$can$server$ServerFrontend$$anon$$anon$$openNewRequest(..))") + def openNewRequest(): Unit = {} + + @After("openNewRequest()") + def afterOpenNewRequest(): Unit = { + TraceContext.clearCurrentContext + } + + @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest) && args(response)") + def openRequestCreation(openRequest: TraceContextAware, response: HttpMessagePartWrapper): Unit = {} + + @Around("openRequestCreation(openRequest, response)") + def afterFinishingRequest(pjp: ProceedingJoinPoint, openRequest: TraceContextAware, response: HttpMessagePartWrapper): Any = { + val incomingContext = TraceContext.currentContext + val storedContext = openRequest.traceContext + + // The stored context is always a DefaultTraceContext if the instrumentation is running + verifyTraceContextConsistency(incomingContext, storedContext) + + if (incomingContext.isEmpty) + pjp.proceed() + else { + val sprayExtension = incomingContext.lookupExtension(Spray) + + val proceedResult = if (sprayExtension.settings.includeTraceTokenHeader) { + val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.settings.traceTokenHeaderName, incomingContext.token) + pjp.proceed(Array(openRequest, responseWithHeader)) + + } else pjp.proceed + + TraceContext.currentContext.finish() + + recordHttpServerMetrics(response, incomingContext.name, sprayExtension) + + //store in TraceLocal useful data to diagnose errors + storeDiagnosticData(openRequest) + + proceedResult + } + } + + def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext): Unit = { + def publishWarning(text: String): Unit = + storedTraceContext.lookupExtension(Spray).log.warning(text) + + if (incomingTraceContext.nonEmpty) { + if (incomingTraceContext.token != storedTraceContext.token) + publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]") + } else + publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]") + } + + def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit = + response match { + case httpResponse: HttpResponse ⇒ sprayExtension.httpServerMetrics.recordResponse(traceName, httpResponse.status.intValue.toString) + case other ⇒ // Nothing to do then. + } + + def includeTraceTokenIfPossible(response: HttpMessagePartWrapper, traceTokenHeaderName: String, token: String): HttpMessagePartWrapper = + response match { + case response: HttpResponse ⇒ response.withHeaders(response.headers ::: RawHeader(traceTokenHeaderName, token) :: Nil) + case other ⇒ other + } + + def storeDiagnosticData(currentContext: TraceContextAware): Unit = { + val request = currentContext.asInstanceOf[OpenRequest].request + val headers = request.headers.map(header ⇒ header.name -> header.value).toMap + val agent = headers.getOrElse(UserAgent, Unknown) + val forwarded = headers.getOrElse(XForwardedFor, Unknown) + + TraceLocal.store(HttpContextKey)(HttpContext(agent, request.uri.toRelative.toString(), forwarded)) + } +} + +object ServerRequestInstrumentation { + val UserAgent = "User-Agent" + val XForwardedFor = "X-Forwarded-For" + val Unknown = "unknown" +} diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala deleted file mode 100644 index 813915c4..00000000 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package spray.can.client - -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint -import spray.http._ -import spray.http.HttpHeaders.RawHeader -import kamon.trace._ -import kamon.Kamon -import kamon.spray.{ ClientSegmentCollectionStrategy, Spray } -import akka.actor.ActorRef -import scala.concurrent.{ Future, ExecutionContext } -import akka.util.Timeout - -@Aspect -class ClientRequestInstrumentation { - - @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") - def mixinTraceContextAwareToRequestContext: TraceContextAware = TraceContextAware.default - - @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") - def mixinSegmentAwareToRequestContext: SegmentAware = SegmentAware.default - - @DeclareMixin("spray.http.HttpRequest") - def mixinSegmentAwareToHttpRequest: SegmentAware = SegmentAware.default - - @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)") - def requestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {} - - @After("requestContextCreation(requestContext, request)") - def afterRequestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = { - // This read to requestContext.traceContext takes care of initializing the aspect timely. - requestContext.traceContext - - TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - val sprayExtension = Kamon(Spray)(system) - - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { - if (requestContext.segment.isEmpty) { - val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) - val segment = ctx.startSegment(clientRequestName, SegmentCategory.HttpClient, Spray.SegmentLibraryName) - requestContext.segment = segment - } - - } else { - - // We have a Request Level API, let's just make sure that we rename it accordingly. The reason for assigning a - // name again here is that when the request was initially sent it might not have the Host information available - // and it might be important to decide a proper segment name. - - val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) - request.asInstanceOf[SegmentAware].segment.rename(clientRequestName) - } - } - } - - @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)") - def copyingRequestContext(old: TraceContextAware): Unit = {} - - @Around("copyingRequestContext(old)") - def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TraceContextAware): Any = { - TraceRecorder.withInlineTraceContextReplacement(old.traceContext) { - pjp.proceed() - } - } - - @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)") - def dispatchToCommander(requestContext: TraceContextAware, message: Any): Unit = {} - - @Around("dispatchToCommander(requestContext, message)") - def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = { - if (requestContext.traceContext.nonEmpty) { - TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) { - if (message.isInstanceOf[HttpMessageEnd]) - requestContext.asInstanceOf[SegmentAware].segment.finish() - - pjp.proceed() - } - } else pjp.proceed() - } - - @Pointcut("execution(* spray.http.HttpRequest.copy(..)) && this(old)") - def copyingHttpRequest(old: SegmentAware): Unit = {} - - @Around("copyingHttpRequest(old)") - def aroundCopyingHttpRequest(pjp: ProceedingJoinPoint, old: SegmentAware): Any = { - val copiedHttpRequest = pjp.proceed().asInstanceOf[SegmentAware] - copiedHttpRequest.segment = old.segment - copiedHttpRequest - } - - @Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)") - def requestLevelApiSendReceive(transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Unit = {} - - @Around("requestLevelApiSendReceive(transport, ec, timeout)") - def aroundRequestLevelApiSendReceive(pjp: ProceedingJoinPoint, transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Any = { - val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]] - - (request: HttpRequest) ⇒ { - TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - val sprayExtension = Kamon(Spray)(system) - val segment = - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) - ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentCategory.HttpClient, Spray.SegmentLibraryName) - else - EmptyTraceContext.EmptySegment - - request.asInstanceOf[SegmentAware].segment = segment - - val responseFuture = originalSendReceive.apply(request) - responseFuture.onComplete { result ⇒ - segment.finish() - }(ec) - - responseFuture - - } getOrElse (originalSendReceive.apply(request)) - } - } - - @Pointcut("execution(* spray.http.HttpMessage.withDefaultHeaders(*)) && this(request) && args(defaultHeaders)") - def includingDefaultHeadersAtHttpHostConnector(request: HttpMessage, defaultHeaders: List[HttpHeader]): Unit = {} - - @Around("includingDefaultHeadersAtHttpHostConnector(request, defaultHeaders)") - def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, request: HttpMessage, defaultHeaders: List[HttpHeader]): Any = { - - val modifiedHeaders = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - val sprayExtension = Kamon(Spray)(system) - if (sprayExtension.includeTraceToken) - RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders - else - defaultHeaders - - } getOrElse (defaultHeaders) - - pjp.proceed(Array[AnyRef](request, modifiedHeaders)) - } -} diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala deleted file mode 100644 index 1ae4ad80..00000000 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package spray.can.server - -import kamon.trace.TraceLocal.{ HttpContext, HttpContextKey } -import org.aspectj.lang.annotation._ -import kamon.trace._ -import akka.actor.ActorSystem -import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest } -import akka.event.Logging.Warning -import kamon.Kamon -import kamon.spray.{ SprayExtension, Spray } -import org.aspectj.lang.ProceedingJoinPoint -import spray.http.HttpHeaders.RawHeader - -@Aspect -class ServerRequestInstrumentation { - - import ServerRequestInstrumentation._ - - @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") - def mixinContextAwareToOpenRequest: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)") - def openRequestInit(openRequest: TraceContextAware, request: HttpRequest): Unit = {} - - @After("openRequestInit(openRequest, request)") - def afterInit(openRequest: TraceContextAware, request: HttpRequest): Unit = { - val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system - val sprayExtension = Kamon(Spray)(system) - - val defaultTraceName = sprayExtension.generateTraceName(request) - val token = if (sprayExtension.includeTraceToken) { - request.headers.find(_.name == sprayExtension.traceTokenHeaderName).map(_.value) - } else None - - TraceRecorder.start(defaultTraceName, token)(system) - - // Necessary to force initialization of traceContext when initiating the request. - openRequest.traceContext - } - - @Pointcut("execution(* spray.can.server.ServerFrontend$$anon$2$$anon$1.spray$can$server$ServerFrontend$$anon$$anon$$openNewRequest(..))") - def openNewRequest(): Unit = {} - - @After("openNewRequest()") - def afterOpenNewRequest(): Unit = { - TraceRecorder.clearContext - } - - @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest) && args(response)") - def openRequestCreation(openRequest: TraceContextAware, response: HttpMessagePartWrapper): Unit = {} - - @Around("openRequestCreation(openRequest, response)") - def afterFinishingRequest(pjp: ProceedingJoinPoint, openRequest: TraceContextAware, response: HttpMessagePartWrapper): Any = { - val incomingContext = TraceRecorder.currentContext - val storedContext = openRequest.traceContext - - // The stored context is always a DefaultTraceContext if the instrumentation is running - val system = storedContext.system - - verifyTraceContextConsistency(incomingContext, storedContext, system) - - if (incomingContext.isEmpty) - pjp.proceed() - else { - val sprayExtension = Kamon(Spray)(system) - - val proceedResult = if (sprayExtension.includeTraceToken) { - val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, incomingContext.token) - pjp.proceed(Array(openRequest, responseWithHeader)) - - } else pjp.proceed - - TraceRecorder.finish() - - recordHttpServerMetrics(response, incomingContext.name, sprayExtension) - - //store in TraceLocal useful data to diagnose errors - storeDiagnosticData(openRequest) - - proceedResult - } - } - - def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext, system: ActorSystem): Unit = { - def publishWarning(text: String, system: ActorSystem): Unit = - system.eventStream.publish(Warning("ServerRequestInstrumentation", classOf[ServerRequestInstrumentation], text)) - - if (incomingTraceContext.nonEmpty) { - if (incomingTraceContext.token != storedTraceContext.token) - publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]", system) - } else - publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]", system) - } - - def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit = - response match { - case httpResponse: HttpResponse ⇒ sprayExtension.httpServerMetrics.recordResponse(traceName, httpResponse.status.intValue.toString) - case other ⇒ // Nothing to do then. - } - - def includeTraceTokenIfPossible(response: HttpMessagePartWrapper, traceTokenHeaderName: String, token: String): HttpMessagePartWrapper = - response match { - case response: HttpResponse ⇒ response.withHeaders(response.headers ::: RawHeader(traceTokenHeaderName, token) :: Nil) - case other ⇒ other - } - - def storeDiagnosticData(currentContext: TraceContextAware): Unit = { - val request = currentContext.asInstanceOf[OpenRequest].request - val headers = request.headers.map(header ⇒ header.name -> header.value).toMap - val agent = headers.getOrElse(UserAgent, Unknown) - val forwarded = headers.getOrElse(XForwardedFor, Unknown) - - TraceLocal.store(HttpContextKey)(HttpContext(agent, request.uri.toRelative.toString(), forwarded)) - } -} - -object ServerRequestInstrumentation { - val UserAgent = "User-Agent" - val XForwardedFor = "X-Forwarded-For" - val Unknown = "unknown" -} diff --git a/kamon-spray/src/test/resources/application.conf b/kamon-spray/src/test/resources/application.conf index 4a9b2c67..8b137891 100644 --- a/kamon-spray/src/test/resources/application.conf +++ b/kamon-spray/src/test/resources/application.conf @@ -1,26 +1 @@ -kamon { - metrics { - tick-interval = 1 second - filters = [ - { - actor { - includes = [] - excludes = [ "system/*", "user/IO-*" ] - } - }, - { - trace { - includes = [ "*" ] - excludes = [] - } - }, - { - dispatcher { - includes = [ "default-dispatcher" ] - excludes = [] - } - } - ] - } -} \ No newline at end of file diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala index b90b0f3b..c5d7d992 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala @@ -16,50 +16,36 @@ package kamon.spray -import akka.testkit.{ TestKitBase, TestProbe } -import akka.actor.ActorSystem +import akka.testkit.TestProbe +import kamon.testkit.BaseKamonSpec import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{ Millis, Seconds, Span } -import org.scalatest.{ Matchers, WordSpecLike } import spray.httpx.RequestBuilding import spray.http.{ HttpResponse, HttpRequest } -import kamon.trace.{ SegmentCategory, SegmentMetricIdentity, TraceRecorder } +import kamon.trace.{ TraceContext, SegmentCategory } import com.typesafe.config.ConfigFactory import spray.can.Http import spray.http.HttpHeaders.RawHeader import kamon.Kamon -import kamon.metric.{ TraceMetrics, Metrics } +import kamon.metric.TraceMetricsSpec import spray.client.pipelining.sendReceive -import kamon.metric.Subscriptions.TickMetricSnapshot import scala.concurrent.duration._ -import kamon.metric.TraceMetrics.TraceMetricsSnapshot - -class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with ScalaFutures with RequestBuilding with TestServer { - implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString( - """ - |akka { - | loglevel = ERROR - |} - | - |kamon { - | spray { - | name-generator = kamon.spray.TestSprayNameGenerator - | } - | - | metrics { - | tick-interval = 1 hour - | - | filters = [ - | { - | trace { - | includes = [ "*" ] - | excludes = [] - | } - | } - | ] - | } - |} - """.stripMargin)) + +class ClientRequestInstrumentationSpec extends BaseKamonSpec("client-request-instrumentation-spec") with ScalaFutures + with RequestBuilding with TestServer { + + import TraceMetricsSpec.SegmentSyntax + + override lazy val config = + ConfigFactory.parseString( + """ + |kamon { + | metric.tick-interval = 1 hour + | spray.name-generator = kamon.spray.TestSprayNameGenerator + |} + | + |akka.loggers = ["akka.event.slf4j.Slf4jLogger"] + """.stripMargin) implicit def ec = system.dispatcher implicit val defaultPatience = PatienceConfig(timeout = Span(10, Seconds), interval = Span(5, Millis)) @@ -71,12 +57,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val (_, server, bound) = buildSHostConnectorAndServer // Initiate a request within the context of a trace - val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("include-trace-token-header-at-request-level-api") { + val (testContext, responseFuture) = TraceContext.withContext(newContext("include-trace-token-header-at-request-level-api")) { val rF = sendReceive(system, ec) { Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path") } - (TraceRecorder.currentContext, rF) + (TraceContext.currentContext, rF) } // Accept the connection at the server side @@ -85,7 +71,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + request.headers should contain(traceTokenHeader(testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) @@ -98,12 +84,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val (_, server, bound) = buildSHostConnectorAndServer // Initiate a request within the context of a trace - val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("do-not-include-trace-token-header-at-request-level-api") { + val (testContext, responseFuture) = TraceContext.withContext(newContext("do-not-include-trace-token-header-at-request-level-api")) { val rF = sendReceive(system, ec) { Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path") } - (TraceRecorder.currentContext, rF) + (TraceContext.currentContext, rF) } // Accept the connection at the server side @@ -112,7 +98,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + request.headers should not contain (traceTokenHeader(testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) @@ -128,12 +114,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val (_, _, bound) = buildSHostConnectorAndServer // Initiate a request within the context of a trace - val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("assign-name-to-segment-with-request-level-api") { + val (testContext, responseFuture) = TraceContext.withContext(newContext("assign-name-to-segment-with-request-level-api")) { val rF = sendReceive(transport.ref)(ec, 10.seconds) { Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment") } - (TraceRecorder.currentContext, rF) + (TraceContext.currentContext, rF) } // Receive the request and reply back @@ -142,10 +128,10 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit responseFuture.futureValue.entity.asString should be("ok") testContext.finish() - val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api") - traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) - traceMetricsSnapshot.segments(SegmentMetricIdentity("request-level /request-level-api-segment", - SegmentCategory.HttpClient, Spray.SegmentLibraryName)).numberOfMeasurements should be(1) + val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api", "trace") + traceMetricsSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + traceMetricsSnapshot.segment("request-level /request-level-api-segment", SegmentCategory.HttpClient, Spray.SegmentLibraryName) + .numberOfMeasurements should be(1) } "rename a request level api segment once it reaches the relevant host connector" in { @@ -155,12 +141,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val (_, server, bound) = buildSHostConnectorAndServer // Initiate a request within the context of a trace - val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("rename-segment-with-request-level-api") { + val (testContext, responseFuture) = TraceContext.withContext(newContext("rename-segment-with-request-level-api")) { val rF = sendReceive(system, ec) { Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment") } - (TraceRecorder.currentContext, rF) + (TraceContext.currentContext, rF) } // Accept the connection at the server side @@ -173,10 +159,10 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit responseFuture.futureValue.entity.asString should be("ok") testContext.finish() - val traceMetricsSnapshot = takeSnapshotOf("rename-segment-with-request-level-api") - traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) - traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /request-level-api-segment", - SegmentCategory.HttpClient, Spray.SegmentLibraryName)).numberOfMeasurements should be(1) + val traceMetricsSnapshot = takeSnapshotOf("rename-segment-with-request-level-api", "trace") + traceMetricsSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + traceMetricsSnapshot.segment("host-level /request-level-api-segment", SegmentCategory.HttpClient, Spray.SegmentLibraryName) + .numberOfMeasurements should be(1) } } @@ -189,9 +175,9 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val client = TestProbe() // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") { + val testContext = TraceContext.withContext(newContext("include-trace-token-header-on-http-client-request")) { client.send(hostConnector, Get("/dummy-path")) - TraceRecorder.currentContext + TraceContext.currentContext } // Accept the connection at the server side @@ -200,7 +186,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + request.headers should contain(traceTokenHeader(testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) @@ -216,9 +202,9 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val client = TestProbe() // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") { + val testContext = TraceContext.withContext(newContext("not-include-trace-token-header-on-http-client-request")) { client.send(hostConnector, Get("/dummy-path")) - TraceRecorder.currentContext + TraceContext.currentContext } // Accept the connection at the server side @@ -227,7 +213,7 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + request.headers should not contain (traceTokenHeader(testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) @@ -243,9 +229,9 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit val client = TestProbe() // Initiate a request within the context of a trace - val testContext = TraceRecorder.withNewTraceContext("create-segment-with-host-level-api") { + val testContext = TraceContext.withContext(newContext("create-segment-with-host-level-api")) { client.send(hostConnector, Get("/host-level-api-segment")) - TraceRecorder.currentContext + TraceContext.currentContext } // Accept the connection at the server side @@ -254,52 +240,39 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) + request.headers should not contain (traceTokenHeader(testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] testContext.finish() - val traceMetricsSnapshot = takeSnapshotOf("create-segment-with-host-level-api") - traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1) - traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /host-level-api-segment", - SegmentCategory.HttpClient, Spray.SegmentLibraryName)).numberOfMeasurements should be(1) + val traceMetricsSnapshot = takeSnapshotOf("create-segment-with-host-level-api", "trace") + traceMetricsSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + traceMetricsSnapshot.segment("host-level /host-level-api-segment", SegmentCategory.HttpClient, Spray.SegmentLibraryName) + .numberOfMeasurements should be(1) } } } - def expectTraceMetrics(traceName: String, listener: TestProbe, timeout: FiniteDuration): TraceMetricsSnapshot = { - val tickSnapshot = within(timeout) { - listener.expectMsgType[TickMetricSnapshot] - } - - val metricsOption = tickSnapshot.metrics.get(TraceMetrics(traceName)) - metricsOption should not be empty - metricsOption.get.asInstanceOf[TraceMetricsSnapshot] - } - - def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = { - val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory) - val collectionContext = Kamon(Metrics).buildDefaultCollectionContext - recorder.get.collect(collectionContext) - } + def traceTokenHeader(token: String): RawHeader = + RawHeader(Kamon(Spray).settings.traceTokenHeaderName, token) - def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal) - def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Pipelining) + def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientInstrumentationLevel.HostLevelAPI) + def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientInstrumentationLevel.RequestLevelAPI) def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true) def disableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(false) - def setSegmentCollectionStrategy(strategy: ClientSegmentCollectionStrategy.Strategy): Unit = { - val target = Kamon(Spray)(system) - val field = target.getClass.getDeclaredField("clientSegmentCollectionStrategy") + def setSegmentCollectionStrategy(strategy: ClientInstrumentationLevel.Level): Unit = { + val target = Kamon(Spray)(system).settings + val field = target.getClass.getDeclaredField("clientInstrumentationLevel") field.setAccessible(true) field.set(target, strategy) } def setIncludeTraceToken(include: Boolean): Unit = { - val target = Kamon(Spray)(system) - val field = target.getClass.getDeclaredField("includeTraceToken") + val target = Kamon(Spray)(system).settings + val field = target.getClass.getDeclaredField("includeTraceTokenHeader") field.setAccessible(true) field.set(target, include) } diff --git a/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala b/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala index c4b370d7..58bb2885 100644 --- a/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/SprayServerMetricsSpec.scala @@ -1,46 +1,27 @@ package kamon.spray -import akka.actor.ActorSystem -import akka.testkit.{ TestProbe, TestKitBase } +import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory -import kamon.Kamon -import kamon.http.HttpServerMetrics -import kamon.metric._ +import kamon.testkit.BaseKamonSpec import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } -import org.scalatest.{ Matchers, WordSpecLike } import spray.http.{ StatusCodes, HttpResponse, HttpRequest } import spray.httpx.RequestBuilding -class SprayServerMetricsSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding - with ScalaFutures with PatienceConfiguration with TestServer { +class SprayServerMetricsSpec extends BaseKamonSpec("spray-server-metrics-spec") with RequestBuilding with ScalaFutures + with PatienceConfiguration with TestServer { - val collectionContext = CollectionContext(100) - - implicit lazy val system: ActorSystem = ActorSystem("spray-server-metrics-spec", ConfigFactory.parseString( - """ - |akka { - | loglevel = ERROR - |} - | - |kamon { - | metrics { - | tick-interval = 1 hour - | - | filters = [ - | { - | trace { - | includes = [ "*" ] - | excludes = [] - | } - | } - | ] - | } - |} - """.stripMargin)) + override lazy val config = + ConfigFactory.parseString( + """ + |kamon.metric { + | tick-interval = 1 hour + |} + | + |akka.loggers = ["akka.event.slf4j.Slf4jLogger"] + """.stripMargin) "the Spray Server metrics instrumentation" should { - "record trace metrics for requests received" in { - Kamon(Metrics)(system).register(TraceMetrics("GET: /record-trace-metrics"), TraceMetrics.Factory).get.collect(collectionContext) + "record trace metrics for processed requests" in { val (connection, server) = buildClientConnectionAndServer val client = TestProbe() @@ -58,15 +39,17 @@ class SprayServerMetricsSpec extends TestKitBase with WordSpecLike with Matchers client.expectMsgType[HttpResponse] } - val snapshot = Kamon(Metrics)(system).register(TraceMetrics("GET: /record-trace-metrics"), TraceMetrics.Factory).get.collect(collectionContext) - snapshot.elapsedTime.numberOfMeasurements should be(15) + val snapshot = takeSnapshotOf("GET: /record-trace-metrics", "trace") + snapshot.histogram("elapsed-time").get.numberOfMeasurements should be(15) } - "record http serve metrics for all the requests" in { - Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get.collect(collectionContext) + "record http server metrics for all the requests" in { val (connection, server) = buildClientConnectionAndServer val client = TestProbe() + // Erase metrics recorder from previous tests. + takeSnapshotOf("spray-server", "http-server") + for (repetition ← 1 to 10) { client.send(connection, Get("/record-http-metrics")) server.expectMsgType[HttpRequest] @@ -81,11 +64,11 @@ class SprayServerMetricsSpec extends TestKitBase with WordSpecLike with Matchers client.expectMsgType[HttpResponse] } - val snapshot = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get.collect(collectionContext) - snapshot.countsPerTraceAndStatusCode("GET: /record-http-metrics")("200").count should be(10) - snapshot.countsPerTraceAndStatusCode("GET: /record-http-metrics")("400").count should be(5) - snapshot.countsPerStatusCode("200").count should be(10) - snapshot.countsPerStatusCode("400").count should be(5) + val snapshot = takeSnapshotOf("spray-server", "http-server") + snapshot.counter("GET: /record-http-metrics_200").get.count should be(10) + snapshot.counter("GET: /record-http-metrics_400").get.count should be(5) + snapshot.counter("200").get.count should be(10) + snapshot.counter("400").get.count should be(5) } } } diff --git a/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala b/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala index 30d42eea..1ae0cb98 100644 --- a/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/SprayServerTracingSpec.scala @@ -17,39 +17,15 @@ package kamon.spray import _root_.spray.httpx.RequestBuilding -import akka.testkit.{ TestKitBase, TestProbe } -import akka.actor.ActorSystem -import org.scalatest.{ Matchers, WordSpecLike } +import akka.testkit.TestProbe +import kamon.testkit.BaseKamonSpec import kamon.Kamon import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } import spray.http.HttpHeaders.RawHeader import spray.http.{ HttpResponse, HttpRequest } -import com.typesafe.config.ConfigFactory - -class SprayServerTracingSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding - with ScalaFutures with PatienceConfiguration with TestServer { - - implicit lazy val system: ActorSystem = ActorSystem("spray-server-tracing-spec", ConfigFactory.parseString( - """ - |akka { - | loglevel = ERROR - |} - | - |kamon { - | metrics { - | tick-interval = 2 seconds - | - | filters = [ - | { - | trace { - | includes = [ "*" ] - | excludes = [] - | } - | } - | ] - | } - |} - """.stripMargin)) + +class SprayServerTracingSpec extends BaseKamonSpec("spray-server-tracing-spec") with RequestBuilding with ScalaFutures + with PatienceConfiguration with TestServer { "the spray server request tracing instrumentation" should { "include the trace-token header in responses when the automatic-trace-token-propagation is enabled" in { @@ -58,12 +34,12 @@ class SprayServerTracingSpec extends TestKitBase with WordSpecLike with Matchers val (connection, server) = buildClientConnectionAndServer val client = TestProbe() - client.send(connection, Get("/").withHeaders(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-enabled"))) + client.send(connection, Get("/").withHeaders(traceTokenHeader("propagation-enabled"))) server.expectMsgType[HttpRequest] server.reply(HttpResponse(entity = "ok")) val response = client.expectMsgType[HttpResponse] - response.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-enabled")) + response.headers should contain(traceTokenHeader("propagation-enabled")) } "reply back with an automatically assigned trace token if none was provided with the request and automatic-trace-token-propagation is enabled" in { @@ -77,7 +53,7 @@ class SprayServerTracingSpec extends TestKitBase with WordSpecLike with Matchers server.reply(HttpResponse(entity = "ok")) val response = client.expectMsgType[HttpResponse] - response.headers.count(_.name == Kamon(Spray).traceTokenHeaderName) should be(1) + response.headers.count(_.name == Kamon(Spray).settings.traceTokenHeaderName) should be(1) } @@ -87,21 +63,24 @@ class SprayServerTracingSpec extends TestKitBase with WordSpecLike with Matchers val (connection, server) = buildClientConnectionAndServer val client = TestProbe() - client.send(connection, Get("/").withHeaders(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled"))) + client.send(connection, Get("/").withHeaders(traceTokenHeader("propagation-disabled"))) server.expectMsgType[HttpRequest] server.reply(HttpResponse(entity = "ok")) val response = client.expectMsgType[HttpResponse] - response.headers should not contain RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled") + response.headers should not contain traceTokenHeader("propagation-disabled") } } + def traceTokenHeader(token: String): RawHeader = + RawHeader(Kamon(Spray).settings.traceTokenHeaderName, token) + def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true) def disableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(false) def setIncludeTraceToken(include: Boolean): Unit = { - val target = Kamon(Spray)(system) - val field = target.getClass.getDeclaredField("includeTraceToken") + val target = Kamon(Spray)(system).settings + val field = target.getClass.getDeclaredField("includeTraceTokenHeader") field.setAccessible(true) field.set(target, include) } -- cgit v1.2.3