aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala')
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala152
1 files changed, 152 insertions, 0 deletions
diff --git a/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala
new file mode 100644
index 00000000..fa9063ad
--- /dev/null
+++ b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala
@@ -0,0 +1,152 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package spray.can.client
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import spray.http._
+import spray.http.HttpHeaders.RawHeader
+import kamon.trace._
+import kamon.spray.{ ClientInstrumentationLevel, Spray }
+import akka.actor.ActorRef
+import scala.concurrent.{ Future, ExecutionContext }
+import akka.util.Timeout
+
+@Aspect
+class ClientRequestInstrumentation {
+
+ @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
+ def mixinTraceContextAwareToRequestContext: TraceContextAware = TraceContextAware.default
+
+ @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
+ def mixinSegmentAwareToRequestContext: SegmentAware = SegmentAware.default
+
+ @DeclareMixin("spray.http.HttpRequest")
+ def mixinSegmentAwareToHttpRequest: SegmentAware = SegmentAware.default
+
+ @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)")
+ def requestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {}
+
+ @After("requestContextCreation(requestContext, request)")
+ def afterRequestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {
+ // This read to requestContext.traceContext takes care of initializing the aspect timely.
+ requestContext.traceContext
+
+ TraceContext.map { ctx ⇒
+ val sprayExtension = ctx.lookupExtension(Spray)
+
+ if (sprayExtension.settings.clientInstrumentationLevel == ClientInstrumentationLevel.HostLevelAPI) {
+ if (requestContext.segment.isEmpty) {
+ val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request)
+ val segment = ctx.startSegment(clientRequestName, SegmentCategory.HttpClient, Spray.SegmentLibraryName)
+ requestContext.segment = segment
+ }
+
+ } else {
+
+ // We have a Request Level API, let's just make sure that we rename it accordingly. The reason for assigning a
+ // name again here is that when the request was initially sent it might not have the Host information available
+ // and it might be important to decide a proper segment name.
+
+ val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request)
+ request.asInstanceOf[SegmentAware].segment.rename(clientRequestName)
+ }
+ }
+ }
+
+ @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)")
+ def copyingRequestContext(old: TraceContextAware): Unit = {}
+
+ @Around("copyingRequestContext(old)")
+ def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TraceContextAware): Any = {
+ TraceContext.withContext(old.traceContext) {
+ pjp.proceed()
+ }
+ }
+
+ @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)")
+ def dispatchToCommander(requestContext: TraceContextAware, message: Any): Unit = {}
+
+ @Around("dispatchToCommander(requestContext, message)")
+ def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = {
+ if (requestContext.traceContext.nonEmpty) {
+ TraceContext.withContext(requestContext.traceContext) {
+ if (message.isInstanceOf[HttpMessageEnd])
+ requestContext.asInstanceOf[SegmentAware].segment.finish()
+
+ pjp.proceed()
+ }
+ } else pjp.proceed()
+ }
+
+ @Pointcut("execution(* spray.http.HttpRequest.copy(..)) && this(old)")
+ def copyingHttpRequest(old: SegmentAware): Unit = {}
+
+ @Around("copyingHttpRequest(old)")
+ def aroundCopyingHttpRequest(pjp: ProceedingJoinPoint, old: SegmentAware): Any = {
+ val copiedHttpRequest = pjp.proceed().asInstanceOf[SegmentAware]
+ copiedHttpRequest.segment = old.segment
+ copiedHttpRequest
+ }
+
+ @Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)")
+ def requestLevelApiSendReceive(transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Unit = {}
+
+ @Around("requestLevelApiSendReceive(transport, ec, timeout)")
+ def aroundRequestLevelApiSendReceive(pjp: ProceedingJoinPoint, transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Any = {
+ val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]]
+
+ (request: HttpRequest) ⇒ {
+ TraceContext.map { ctx ⇒
+ val sprayExtension = ctx.lookupExtension(Spray)
+ val segment =
+ if (sprayExtension.settings.clientInstrumentationLevel == ClientInstrumentationLevel.RequestLevelAPI)
+ ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentCategory.HttpClient, Spray.SegmentLibraryName)
+ else
+ EmptyTraceContext.EmptySegment
+
+ request.asInstanceOf[SegmentAware].segment = segment
+
+ val responseFuture = originalSendReceive.apply(request)
+ responseFuture.onComplete { result ⇒
+ segment.finish()
+ }(ec)
+
+ responseFuture
+
+ } getOrElse (originalSendReceive.apply(request))
+ }
+ }
+
+ @Pointcut("execution(* spray.http.HttpMessage.withDefaultHeaders(*)) && this(request) && args(defaultHeaders)")
+ def includingDefaultHeadersAtHttpHostConnector(request: HttpMessage, defaultHeaders: List[HttpHeader]): Unit = {}
+
+ @Around("includingDefaultHeadersAtHttpHostConnector(request, defaultHeaders)")
+ def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, request: HttpMessage, defaultHeaders: List[HttpHeader]): Any = {
+
+ val modifiedHeaders = TraceContext.map { ctx ⇒
+ val sprayExtension = ctx.lookupExtension(Spray)
+ if (sprayExtension.settings.includeTraceTokenHeader)
+ RawHeader(sprayExtension.settings.traceTokenHeaderName, ctx.token) :: defaultHeaders
+ else
+ defaultHeaders
+
+ } getOrElse (defaultHeaders)
+
+ pjp.proceed(Array[AnyRef](request, modifiedHeaders))
+ }
+} \ No newline at end of file