aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray/src/main
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-03-24 12:42:18 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-03-24 12:42:18 -0300
commit8ed58faae77db3773ded06ccb8e8529cfb9031d6 (patch)
tree80a511fc42d81e21c17bdd919e4700953c8a0714 /kamon-spray/src/main
parent1b7442f20d65e4a4b43b995acf8a7af538714913 (diff)
downloadKamon-8ed58faae77db3773ded06ccb8e8529cfb9031d6.tar.gz
Kamon-8ed58faae77db3773ded06ccb8e8529cfb9031d6.tar.bz2
Kamon-8ed58faae77db3773ded06ccb8e8529cfb9031d6.zip
complete automatic trace token propagation for spray-client, closes #14
Diffstat (limited to 'kamon-spray/src/main')
-rw-r--r--kamon-spray/src/main/resources/reference.conf27
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/Spray.scala17
-rw-r--r--kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala78
3 files changed, 93 insertions, 29 deletions
diff --git a/kamon-spray/src/main/resources/reference.conf b/kamon-spray/src/main/resources/reference.conf
index 88bd8fb8..67d191c7 100644
--- a/kamon-spray/src/main/resources/reference.conf
+++ b/kamon-spray/src/main/resources/reference.conf
@@ -1,6 +1,31 @@
kamon {
spray {
- include-trace-token-header = true
+ # Header name used when propagating the `TraceContext.token` value across applications.
trace-token-header-name = "X-Trace-Token"
+
+ # When set to true, Kamon will automatically set and propogate the `TraceContext.token` value under the following
+ # conditions:
+ # - When a server side request is received containing the trace token header, the new `TraceContext` will have that
+ # some token, and once the response to that request is ready, the trace token header is also included in the
+ # response.
+ # - When a spray-client request is issued and a `TraceContext` is available, the trace token header will be included
+ # in the `HttpRequest` headers.
+ automatic-trace-token-propagation = true
+
+
+ client {
+ # Strategy used for automatic trace segment generation when issue requests with spray-client. The possible values
+ # are:
+ # - pipelining: measures the time during which the user application code is waiting for a spray-client request to
+ # complete, by attaching a callback to the Future[HttpResponse] returned by `spray.client.pipelining.sendReceive`.
+ # If `spray.client.pipelining.sendReceive` is not used, the segment measurement wont be performed.
+ # - internal: measures the internal time taken by spray-client to finish a request. Sometimes the user application
+ # code has a finite future timeout (like when using `spray.client.pipelining.sendReceive`) that doesn't match
+ # the actual amount of time spray might take internally to resolve a request, counting retries, redirects,
+ # connection timeouts and so on. If using the internal strategy, the measured time will include the entire time
+ # since the request has been received by the corresponding `HttpHostConnector` until a response is sent back
+ # to the requester.
+ segment-collection-strategy = pipelining
+ }
}
} \ No newline at end of file
diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala
index 4dc98d85..9de1882a 100644
--- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/Spray.scala
@@ -24,14 +24,29 @@ import spray.http.HttpRequest
object Spray extends ExtensionId[SprayExtension] with ExtensionIdProvider {
def lookup(): ExtensionId[_ <: actor.Extension] = Spray
def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtension(system)
+
+}
+
+object ClientSegmentCollectionStrategy {
+ sealed trait Strategy
+ case object Pipelining extends Strategy
+ case object Internal extends Strategy
}
class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Extension {
private val config = system.settings.config.getConfig("kamon.spray")
- val includeTraceToken: Boolean = config.getBoolean("include-trace-token-header")
+ val includeTraceToken: Boolean = config.getBoolean("automatic-trace-token-propagation")
val traceTokenHeaderName: String = config.getString("trace-token-header-name")
+ val clientSegmentCollectionStrategy: ClientSegmentCollectionStrategy.Strategy =
+ config.getString("client.segment-collection-strategy") match {
+ case "pipelining" ⇒ ClientSegmentCollectionStrategy.Pipelining
+ case "internal" ⇒ ClientSegmentCollectionStrategy.Internal
+ case other ⇒ throw new IllegalArgumentException(s"Configured segment-collection-strategy [$other] is invalid, " +
+ s"only pipelining and internal are valid options.")
+ }
+
// Later we should expose a way for the user to customize this.
def assignHttpClientRequestName(request: HttpRequest): String = request.uri.authority.host.address
}
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
index 4e2a352c..d7d9cf09 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
@@ -1,29 +1,29 @@
-/* ===================================================
+/*
+ * =========================================================================================
* Copyright © 2013 the kamon project <http://kamon.io/>
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
package spray.can.client
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
-import spray.http.{ HttpResponse, HttpMessageEnd, HttpRequest }
-import spray.http.HttpHeaders.Host
-import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware, TraceContextAware }
+import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest }
+import spray.http.HttpHeaders.{ RawHeader, Host }
+import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware }
import kamon.metrics.TraceMetrics.HttpClientRequest
import kamon.Kamon
-import kamon.spray.Spray
+import kamon.spray.{ ClientSegmentCollectionStrategy, Spray }
import akka.actor.ActorRef
import scala.concurrent.{ Future, ExecutionContext }
import akka.util.Timeout
@@ -43,14 +43,18 @@ class ClientRequestInstrumentation {
// The RequestContext will be copied when a request needs to be retried but we are only interested in creating the
// completion handle the first time we create one.
- // The read to ctx.completionHandle should take care of initializing the aspect timely.
+ // The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely.
if (ctx.segmentCompletionHandle.isEmpty) {
TraceRecorder.currentContext.map { traceContext ⇒
- val requestAttributes = basicRequestAttributes(request)
- val clientRequestName = Kamon(Spray)(traceContext.system).assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, SprayTime), requestAttributes)
+ val sprayExtension = Kamon(Spray)(traceContext.system)
- ctx.segmentCompletionHandle = Some(completionHandle)
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
+ val requestAttributes = basicRequestAttributes(request)
+ val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
+ val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, SprayTime), requestAttributes)
+
+ ctx.segmentCompletionHandle = Some(completionHandle)
+ }
}
}
}
@@ -92,15 +96,18 @@ class ClientRequestInstrumentation {
(request: HttpRequest) ⇒ {
val responseFuture = originalSendReceive.apply(request)
-
TraceRecorder.currentContext.map { traceContext ⇒
- val requestAttributes = basicRequestAttributes(request)
- val clientRequestName = Kamon(Spray)(traceContext.system).assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, UserTime), requestAttributes)
+ val sprayExtension = Kamon(Spray)(traceContext.system)
- responseFuture.onComplete { result ⇒
- completionHandle.finish(Map.empty)
- }(ec)
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) {
+ val requestAttributes = basicRequestAttributes(request)
+ val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
+ val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, UserTime), requestAttributes)
+
+ responseFuture.onComplete { result ⇒
+ completionHandle.finish(Map.empty)
+ }(ec)
+ }
}
responseFuture
@@ -114,6 +121,23 @@ class ClientRequestInstrumentation {
"path" -> request.uri.path.toString(),
"method" -> request.method.toString())
}
+
+ @Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)")
+ def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {}
+
+ @Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)")
+ def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = {
+ val modifiedHeaders = TraceRecorder.currentContext map { traceContext ⇒
+ val sprayExtension = Kamon(Spray)(traceContext.system)
+
+ if (sprayExtension.includeTraceToken)
+ RawHeader(sprayExtension.traceTokenHeaderName, traceContext.token) :: defaultHeaders
+ else
+ defaultHeaders
+ } getOrElse defaultHeaders
+
+ pjp.proceed(Array(modifiedHeaders))
+ }
}
object ClientRequestInstrumentation {