diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-10-31 02:45:41 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-10-31 02:45:41 +0100 |
commit | 89d3057f8025add4b94b32c142e220ffb79f6c33 (patch) | |
tree | ca5cb3adccd9032450ec9f4fbfafb5542a52a315 /kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala | |
parent | cd8dce169b4231bf533445656bfb5a35034a6304 (diff) | |
download | Kamon-89d3057f8025add4b94b32c142e220ffb79f6c33.tar.gz Kamon-89d3057f8025add4b94b32c142e220ffb79f6c33.tar.bz2 Kamon-89d3057f8025add4b94b32c142e220ffb79f6c33.zip |
+ spray: external naming for traces and segments, related to #65
Diffstat (limited to 'kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala')
-rw-r--r-- | kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala | 121 |
1 files changed, 68 insertions, 53 deletions
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala index cfd204df..94fc3572 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala @@ -18,7 +18,7 @@ package spray.can.client import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest } +import spray.http._ import spray.http.HttpHeaders.RawHeader import kamon.trace._ import kamon.Kamon @@ -31,60 +31,79 @@ import akka.util.Timeout class ClientRequestInstrumentation { @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") - def mixin: SegmentAware = SegmentAware.default + def mixinTraceContextAwareToRequestContext: TraceContextAware = TraceContextAware.default + + @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") + def mixinSegmentAwareToRequestContext: SegmentAware = SegmentAware.default + + @DeclareMixin("spray.http.HttpRequest") + def mixinSegmentAwareToHttpRequest: SegmentAware = SegmentAware.default @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)") - def requestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {} + def requestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {} @After("requestContextCreation(requestContext, request)") - def afterRequestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = { - // The RequestContext will be copied when a request needs to be retried but we are only interested in creating the - // segment the first time we create one. + def afterRequestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = { + // This read to requestContext.traceContext takes care of initializing the aspect timely. + requestContext.traceContext - // The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely. - if (requestContext.segment.isEmpty) { - TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ - val sprayExtension = Kamon(Spray)(ctx.system) + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + val sprayExtension = Kamon(Spray)(system) - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { - val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { + if (requestContext.segment.isEmpty) { + val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) + val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + requestContext.segment = segment + } - requestContext.segment = segment - } + } else { - case EmptyTraceContext ⇒ // Nothing to do here. + // We have a Request Level API, let's just make sure that we rename it accordingly. The reason for assigning a + // name again here is that when the request was initially sent it might not have the Host information available + // and it might be important to decide a proper segment name. + + val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) + request.asInstanceOf[SegmentAware].segment.rename(clientRequestName) } } } @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)") - def copyingRequestContext(old: SegmentAware): Unit = {} + def copyingRequestContext(old: TraceContextAware): Unit = {} @Around("copyingRequestContext(old)") - def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: SegmentAware): Any = { + def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TraceContextAware): Any = { TraceRecorder.withInlineTraceContextReplacement(old.traceContext) { pjp.proceed() } } @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)") - def dispatchToCommander(requestContext: SegmentAware, message: Any): Unit = {} + def dispatchToCommander(requestContext: TraceContextAware, message: Any): Unit = {} @Around("dispatchToCommander(requestContext, message)") - def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentAware, message: Any) = { + def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = { if (requestContext.traceContext.nonEmpty) { TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) { if (message.isInstanceOf[HttpMessageEnd]) - requestContext.segment.finish() + requestContext.asInstanceOf[SegmentAware].segment.finish() pjp.proceed() } - } else pjp.proceed() } + @Pointcut("execution(* spray.http.HttpRequest.copy(..)) && this(old)") + def copyingHttpRequest(old: SegmentAware): Unit = {} + + @Around("copyingHttpRequest(old)") + def aroundCopyingHttpRequest(pjp: ProceedingJoinPoint, old: SegmentAware): Any = { + val copiedHttpRequest = pjp.proceed().asInstanceOf[SegmentAware] + copiedHttpRequest.segment = old.segment + copiedHttpRequest + } + @Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)") def requestLevelApiSendReceive(transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Unit = {} @@ -93,46 +112,42 @@ class ClientRequestInstrumentation { val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]] (request: HttpRequest) ⇒ { - val responseFuture = originalSendReceive.apply(request) - - TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ - val sprayExtension = Kamon(Spray)(ctx.system) + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + val sprayExtension = Kamon(Spray)(system) + val segment = + if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) + ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentMetricIdentityLabel.HttpClient) + else + EmptyTraceContext.EmptySegment - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) { - val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + request.asInstanceOf[SegmentAware].segment = segment - responseFuture.onComplete { result ⇒ - segment.finish() - }(ec) - } + val responseFuture = originalSendReceive.apply(request) + responseFuture.onComplete { result ⇒ + segment.finish() + }(ec) - case EmptyTraceContext ⇒ // Nothing to do here. - } + responseFuture - responseFuture + } getOrElse (originalSendReceive.apply(request)) } - } - @Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)") - def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {} + @Pointcut("execution(* spray.http.HttpMessage.withDefaultHeaders(*)) && this(request) && args(defaultHeaders)") + def includingDefaultHeadersAtHttpHostConnector(request: HttpMessage, defaultHeaders: List[HttpHeader]): Unit = {} - @Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)") - def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = { - val modifiedHeaders = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ - val sprayExtension = Kamon(Spray)(ctx.system) + @Around("includingDefaultHeadersAtHttpHostConnector(request, defaultHeaders)") + def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, request: HttpMessage, defaultHeaders: List[HttpHeader]): Any = { - if (sprayExtension.includeTraceToken) - RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders - else - defaultHeaders + val modifiedHeaders = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + val sprayExtension = Kamon(Spray)(system) + if (sprayExtension.includeTraceToken) + RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders + else + defaultHeaders - case EmptyTraceContext ⇒ defaultHeaders - } + } getOrElse (defaultHeaders) - pjp.proceed(Array(modifiedHeaders)) + pjp.proceed(Array[AnyRef](request, modifiedHeaders)) } } |