aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-03-24 12:42:18 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-03-24 12:42:18 -0300
commit8ed58faae77db3773ded06ccb8e8529cfb9031d6 (patch)
tree80a511fc42d81e21c17bdd919e4700953c8a0714 /kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
parent1b7442f20d65e4a4b43b995acf8a7af538714913 (diff)
downloadKamon-8ed58faae77db3773ded06ccb8e8529cfb9031d6.tar.gz
Kamon-8ed58faae77db3773ded06ccb8e8529cfb9031d6.tar.bz2
Kamon-8ed58faae77db3773ded06ccb8e8529cfb9031d6.zip
complete automatic trace token propagation for spray-client, closes #14
Diffstat (limited to 'kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala')
-rw-r--r--kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala78
1 files changed, 51 insertions, 27 deletions
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
index 4e2a352c..d7d9cf09 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
@@ -1,29 +1,29 @@
-/* ===================================================
+/*
+ * =========================================================================================
* Copyright © 2013 the kamon project <http://kamon.io/>
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
package spray.can.client
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
-import spray.http.{ HttpResponse, HttpMessageEnd, HttpRequest }
-import spray.http.HttpHeaders.Host
-import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware, TraceContextAware }
+import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest }
+import spray.http.HttpHeaders.{ RawHeader, Host }
+import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware }
import kamon.metrics.TraceMetrics.HttpClientRequest
import kamon.Kamon
-import kamon.spray.Spray
+import kamon.spray.{ ClientSegmentCollectionStrategy, Spray }
import akka.actor.ActorRef
import scala.concurrent.{ Future, ExecutionContext }
import akka.util.Timeout
@@ -43,14 +43,18 @@ class ClientRequestInstrumentation {
// The RequestContext will be copied when a request needs to be retried but we are only interested in creating the
// completion handle the first time we create one.
- // The read to ctx.completionHandle should take care of initializing the aspect timely.
+ // The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely.
if (ctx.segmentCompletionHandle.isEmpty) {
TraceRecorder.currentContext.map { traceContext ⇒
- val requestAttributes = basicRequestAttributes(request)
- val clientRequestName = Kamon(Spray)(traceContext.system).assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, SprayTime), requestAttributes)
+ val sprayExtension = Kamon(Spray)(traceContext.system)
- ctx.segmentCompletionHandle = Some(completionHandle)
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
+ val requestAttributes = basicRequestAttributes(request)
+ val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
+ val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, SprayTime), requestAttributes)
+
+ ctx.segmentCompletionHandle = Some(completionHandle)
+ }
}
}
}
@@ -92,15 +96,18 @@ class ClientRequestInstrumentation {
(request: HttpRequest) ⇒ {
val responseFuture = originalSendReceive.apply(request)
-
TraceRecorder.currentContext.map { traceContext ⇒
- val requestAttributes = basicRequestAttributes(request)
- val clientRequestName = Kamon(Spray)(traceContext.system).assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, UserTime), requestAttributes)
+ val sprayExtension = Kamon(Spray)(traceContext.system)
- responseFuture.onComplete { result ⇒
- completionHandle.finish(Map.empty)
- }(ec)
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) {
+ val requestAttributes = basicRequestAttributes(request)
+ val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
+ val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, UserTime), requestAttributes)
+
+ responseFuture.onComplete { result ⇒
+ completionHandle.finish(Map.empty)
+ }(ec)
+ }
}
responseFuture
@@ -114,6 +121,23 @@ class ClientRequestInstrumentation {
"path" -> request.uri.path.toString(),
"method" -> request.method.toString())
}
+
+ @Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)")
+ def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {}
+
+ @Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)")
+ def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = {
+ val modifiedHeaders = TraceRecorder.currentContext map { traceContext ⇒
+ val sprayExtension = Kamon(Spray)(traceContext.system)
+
+ if (sprayExtension.includeTraceToken)
+ RawHeader(sprayExtension.traceTokenHeaderName, traceContext.token) :: defaultHeaders
+ else
+ defaultHeaders
+ } getOrElse defaultHeaders
+
+ pjp.proceed(Array(modifiedHeaders))
+ }
}
object ClientRequestInstrumentation {