aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala')
-rw-r--r--kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala121
1 files changed, 68 insertions, 53 deletions
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
index cfd204df..94fc3572 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
@@ -18,7 +18,7 @@ package spray.can.client
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
-import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest }
+import spray.http._
import spray.http.HttpHeaders.RawHeader
import kamon.trace._
import kamon.Kamon
@@ -31,60 +31,79 @@ import akka.util.Timeout
class ClientRequestInstrumentation {
@DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
- def mixin: SegmentAware = SegmentAware.default
+ def mixinTraceContextAwareToRequestContext: TraceContextAware = TraceContextAware.default
+
+ @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
+ def mixinSegmentAwareToRequestContext: SegmentAware = SegmentAware.default
+
+ @DeclareMixin("spray.http.HttpRequest")
+ def mixinSegmentAwareToHttpRequest: SegmentAware = SegmentAware.default
@Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)")
- def requestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {}
+ def requestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {}
@After("requestContextCreation(requestContext, request)")
- def afterRequestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {
- // The RequestContext will be copied when a request needs to be retried but we are only interested in creating the
- // segment the first time we create one.
+ def afterRequestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {
+ // This read to requestContext.traceContext takes care of initializing the aspect timely.
+ requestContext.traceContext
- // The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely.
- if (requestContext.segment.isEmpty) {
- TraceRecorder.currentContext match {
- case ctx: DefaultTraceContext ⇒
- val sprayExtension = Kamon(Spray)(ctx.system)
+ TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ val sprayExtension = Kamon(Spray)(system)
- if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
- val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
- val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient)
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
+ if (requestContext.segment.isEmpty) {
+ val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request)
+ val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient)
+ requestContext.segment = segment
+ }
- requestContext.segment = segment
- }
+ } else {
- case EmptyTraceContext ⇒ // Nothing to do here.
+ // We have a Request Level API, let's just make sure that we rename it accordingly. The reason for assigning a
+ // name again here is that when the request was initially sent it might not have the Host information available
+ // and it might be important to decide a proper segment name.
+
+ val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request)
+ request.asInstanceOf[SegmentAware].segment.rename(clientRequestName)
}
}
}
@Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)")
- def copyingRequestContext(old: SegmentAware): Unit = {}
+ def copyingRequestContext(old: TraceContextAware): Unit = {}
@Around("copyingRequestContext(old)")
- def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: SegmentAware): Any = {
+ def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TraceContextAware): Any = {
TraceRecorder.withInlineTraceContextReplacement(old.traceContext) {
pjp.proceed()
}
}
@Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)")
- def dispatchToCommander(requestContext: SegmentAware, message: Any): Unit = {}
+ def dispatchToCommander(requestContext: TraceContextAware, message: Any): Unit = {}
@Around("dispatchToCommander(requestContext, message)")
- def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentAware, message: Any) = {
+ def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = {
if (requestContext.traceContext.nonEmpty) {
TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) {
if (message.isInstanceOf[HttpMessageEnd])
- requestContext.segment.finish()
+ requestContext.asInstanceOf[SegmentAware].segment.finish()
pjp.proceed()
}
-
} else pjp.proceed()
}
+ @Pointcut("execution(* spray.http.HttpRequest.copy(..)) && this(old)")
+ def copyingHttpRequest(old: SegmentAware): Unit = {}
+
+ @Around("copyingHttpRequest(old)")
+ def aroundCopyingHttpRequest(pjp: ProceedingJoinPoint, old: SegmentAware): Any = {
+ val copiedHttpRequest = pjp.proceed().asInstanceOf[SegmentAware]
+ copiedHttpRequest.segment = old.segment
+ copiedHttpRequest
+ }
+
@Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)")
def requestLevelApiSendReceive(transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Unit = {}
@@ -93,46 +112,42 @@ class ClientRequestInstrumentation {
val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]]
(request: HttpRequest) ⇒ {
- val responseFuture = originalSendReceive.apply(request)
-
- TraceRecorder.currentContext match {
- case ctx: DefaultTraceContext ⇒
- val sprayExtension = Kamon(Spray)(ctx.system)
+ TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ val sprayExtension = Kamon(Spray)(system)
+ val segment =
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining)
+ ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentMetricIdentityLabel.HttpClient)
+ else
+ EmptyTraceContext.EmptySegment
- if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) {
- val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
- val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient)
+ request.asInstanceOf[SegmentAware].segment = segment
- responseFuture.onComplete { result ⇒
- segment.finish()
- }(ec)
- }
+ val responseFuture = originalSendReceive.apply(request)
+ responseFuture.onComplete { result ⇒
+ segment.finish()
+ }(ec)
- case EmptyTraceContext ⇒ // Nothing to do here.
- }
+ responseFuture
- responseFuture
+ } getOrElse (originalSendReceive.apply(request))
}
-
}
- @Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)")
- def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {}
+ @Pointcut("execution(* spray.http.HttpMessage.withDefaultHeaders(*)) && this(request) && args(defaultHeaders)")
+ def includingDefaultHeadersAtHttpHostConnector(request: HttpMessage, defaultHeaders: List[HttpHeader]): Unit = {}
- @Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)")
- def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = {
- val modifiedHeaders = TraceRecorder.currentContext match {
- case ctx: DefaultTraceContext ⇒
- val sprayExtension = Kamon(Spray)(ctx.system)
+ @Around("includingDefaultHeadersAtHttpHostConnector(request, defaultHeaders)")
+ def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, request: HttpMessage, defaultHeaders: List[HttpHeader]): Any = {
- if (sprayExtension.includeTraceToken)
- RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders
- else
- defaultHeaders
+ val modifiedHeaders = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ val sprayExtension = Kamon(Spray)(system)
+ if (sprayExtension.includeTraceToken)
+ RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders
+ else
+ defaultHeaders
- case EmptyTraceContext ⇒ defaultHeaders
- }
+ } getOrElse (defaultHeaders)
- pjp.proceed(Array(modifiedHeaders))
+ pjp.proceed(Array[AnyRef](request, modifiedHeaders))
}
}