aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-03-24 12:42:18 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-03-24 12:42:18 -0300
commit8ed58faae77db3773ded06ccb8e8529cfb9031d6 (patch)
tree80a511fc42d81e21c17bdd919e4700953c8a0714 /kamon-spray
parent1b7442f20d65e4a4b43b995acf8a7af538714913 (diff)
downloadKamon-8ed58faae77db3773ded06ccb8e8529cfb9031d6.tar.gz
Kamon-8ed58faae77db3773ded06ccb8e8529cfb9031d6.tar.bz2
Kamon-8ed58faae77db3773ded06ccb8e8529cfb9031d6.zip
complete automatic trace token propagation for spray-client, closes #14
Diffstat (limited to 'kamon-spray')
-rw-r--r--kamon-spray/src/main/resources/reference.conf27
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/Spray.scala17
-rw-r--r--kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala78
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala251
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala106
5 files changed, 360 insertions, 119 deletions
diff --git a/kamon-spray/src/main/resources/reference.conf b/kamon-spray/src/main/resources/reference.conf
index 88bd8fb8..67d191c7 100644
--- a/kamon-spray/src/main/resources/reference.conf
+++ b/kamon-spray/src/main/resources/reference.conf
@@ -1,6 +1,31 @@
kamon {
spray {
- include-trace-token-header = true
+ # Header name used when propagating the `TraceContext.token` value across applications.
trace-token-header-name = "X-Trace-Token"
+
+ # When set to true, Kamon will automatically set and propogate the `TraceContext.token` value under the following
+ # conditions:
+ # - When a server side request is received containing the trace token header, the new `TraceContext` will have that
+ # some token, and once the response to that request is ready, the trace token header is also included in the
+ # response.
+ # - When a spray-client request is issued and a `TraceContext` is available, the trace token header will be included
+ # in the `HttpRequest` headers.
+ automatic-trace-token-propagation = true
+
+
+ client {
+ # Strategy used for automatic trace segment generation when issue requests with spray-client. The possible values
+ # are:
+ # - pipelining: measures the time during which the user application code is waiting for a spray-client request to
+ # complete, by attaching a callback to the Future[HttpResponse] returned by `spray.client.pipelining.sendReceive`.
+ # If `spray.client.pipelining.sendReceive` is not used, the segment measurement wont be performed.
+ # - internal: measures the internal time taken by spray-client to finish a request. Sometimes the user application
+ # code has a finite future timeout (like when using `spray.client.pipelining.sendReceive`) that doesn't match
+ # the actual amount of time spray might take internally to resolve a request, counting retries, redirects,
+ # connection timeouts and so on. If using the internal strategy, the measured time will include the entire time
+ # since the request has been received by the corresponding `HttpHostConnector` until a response is sent back
+ # to the requester.
+ segment-collection-strategy = pipelining
+ }
}
} \ No newline at end of file
diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala
index 4dc98d85..9de1882a 100644
--- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/Spray.scala
@@ -24,14 +24,29 @@ import spray.http.HttpRequest
object Spray extends ExtensionId[SprayExtension] with ExtensionIdProvider {
def lookup(): ExtensionId[_ <: actor.Extension] = Spray
def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtension(system)
+
+}
+
+object ClientSegmentCollectionStrategy {
+ sealed trait Strategy
+ case object Pipelining extends Strategy
+ case object Internal extends Strategy
}
class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Extension {
private val config = system.settings.config.getConfig("kamon.spray")
- val includeTraceToken: Boolean = config.getBoolean("include-trace-token-header")
+ val includeTraceToken: Boolean = config.getBoolean("automatic-trace-token-propagation")
val traceTokenHeaderName: String = config.getString("trace-token-header-name")
+ val clientSegmentCollectionStrategy: ClientSegmentCollectionStrategy.Strategy =
+ config.getString("client.segment-collection-strategy") match {
+ case "pipelining" ⇒ ClientSegmentCollectionStrategy.Pipelining
+ case "internal" ⇒ ClientSegmentCollectionStrategy.Internal
+ case other ⇒ throw new IllegalArgumentException(s"Configured segment-collection-strategy [$other] is invalid, " +
+ s"only pipelining and internal are valid options.")
+ }
+
// Later we should expose a way for the user to customize this.
def assignHttpClientRequestName(request: HttpRequest): String = request.uri.authority.host.address
}
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
index 4e2a352c..d7d9cf09 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
@@ -1,29 +1,29 @@
-/* ===================================================
+/*
+ * =========================================================================================
* Copyright © 2013 the kamon project <http://kamon.io/>
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
package spray.can.client
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
-import spray.http.{ HttpResponse, HttpMessageEnd, HttpRequest }
-import spray.http.HttpHeaders.Host
-import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware, TraceContextAware }
+import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest }
+import spray.http.HttpHeaders.{ RawHeader, Host }
+import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware }
import kamon.metrics.TraceMetrics.HttpClientRequest
import kamon.Kamon
-import kamon.spray.Spray
+import kamon.spray.{ ClientSegmentCollectionStrategy, Spray }
import akka.actor.ActorRef
import scala.concurrent.{ Future, ExecutionContext }
import akka.util.Timeout
@@ -43,14 +43,18 @@ class ClientRequestInstrumentation {
// The RequestContext will be copied when a request needs to be retried but we are only interested in creating the
// completion handle the first time we create one.
- // The read to ctx.completionHandle should take care of initializing the aspect timely.
+ // The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely.
if (ctx.segmentCompletionHandle.isEmpty) {
TraceRecorder.currentContext.map { traceContext ⇒
- val requestAttributes = basicRequestAttributes(request)
- val clientRequestName = Kamon(Spray)(traceContext.system).assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, SprayTime), requestAttributes)
+ val sprayExtension = Kamon(Spray)(traceContext.system)
- ctx.segmentCompletionHandle = Some(completionHandle)
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
+ val requestAttributes = basicRequestAttributes(request)
+ val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
+ val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, SprayTime), requestAttributes)
+
+ ctx.segmentCompletionHandle = Some(completionHandle)
+ }
}
}
}
@@ -92,15 +96,18 @@ class ClientRequestInstrumentation {
(request: HttpRequest) ⇒ {
val responseFuture = originalSendReceive.apply(request)
-
TraceRecorder.currentContext.map { traceContext ⇒
- val requestAttributes = basicRequestAttributes(request)
- val clientRequestName = Kamon(Spray)(traceContext.system).assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, UserTime), requestAttributes)
+ val sprayExtension = Kamon(Spray)(traceContext.system)
- responseFuture.onComplete { result ⇒
- completionHandle.finish(Map.empty)
- }(ec)
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) {
+ val requestAttributes = basicRequestAttributes(request)
+ val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
+ val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName, UserTime), requestAttributes)
+
+ responseFuture.onComplete { result ⇒
+ completionHandle.finish(Map.empty)
+ }(ec)
+ }
}
responseFuture
@@ -114,6 +121,23 @@ class ClientRequestInstrumentation {
"path" -> request.uri.path.toString(),
"method" -> request.method.toString())
}
+
+ @Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)")
+ def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {}
+
+ @Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)")
+ def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = {
+ val modifiedHeaders = TraceRecorder.currentContext map { traceContext ⇒
+ val sprayExtension = Kamon(Spray)(traceContext.system)
+
+ if (sprayExtension.includeTraceToken)
+ RawHeader(sprayExtension.traceTokenHeaderName, traceContext.token) :: defaultHeaders
+ else
+ defaultHeaders
+ } getOrElse defaultHeaders
+
+ pjp.proceed(Array(modifiedHeaders))
+ }
}
object ClientRequestInstrumentation {
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
index 0b1db1b7..9469924a 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -1,27 +1,49 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.spray
import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.ActorSystem
-import org.scalatest.WordSpecLike
+import org.scalatest.{ Matchers, WordSpecLike }
import spray.httpx.RequestBuilding
-import kamon.Kamon
-import kamon.metrics.{ TraceMetrics, Metrics }
import spray.http.{ HttpResponse, HttpRequest }
-import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.trace.TraceRecorder
-import spray.can.client.ClientRequestInstrumentation
import com.typesafe.config.ConfigFactory
import spray.can.Http
-import akka.pattern.pipe
+import spray.http.HttpHeaders.RawHeader
+import kamon.Kamon
+import kamon.metrics.{ TraceMetrics, Metrics }
import spray.client.pipelining
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import spray.can.client.ClientRequestInstrumentation
import scala.concurrent.duration._
+import akka.pattern.pipe
+import kamon.metrics.TraceMetrics.TraceMetricSnapshot
-class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with RequestBuilding with TestServer {
- implicit lazy val system: ActorSystem = ActorSystem("server-request-tracing-spec", ConfigFactory.parseString(
+class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer {
+ implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString(
"""
+ |akka {
+ | loglevel = ERROR
+ |}
+ |
|kamon {
| metrics {
- | tick-interval = 1 second
+ | tick-interval = 2 seconds
|
| filters = [
| {
@@ -37,79 +59,172 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
implicit def ec = system.dispatcher
- "the client instrumentation" should {
- "record the elapsed time for a http request when using the Http manager directly and tag it as SprayTime" in {
+ "the client instrumentation" when {
+ "configured to do automatic-trace-token-propagation" should {
+ "include the trace token header on spray-client requests" in {
+ enableAutomaticTraceTokenPropagation()
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
- val (hostConnector, server) = buildSHostConnectorAndServer
- val client = TestProbe()
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") {
+ client.send(hostConnector, Get("/dummy-path"))
+ TraceRecorder.currentContext
+ }
- // Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("direct-to-http-manager-request") {
- client.send(hostConnector, Get("/direct-to-http-manager-request"))
- TraceRecorder.currentContext
- }
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
- // Accept the connection at the server side
- server.expectMsgType[Http.Connected]
- server.reply(Http.Register(server.ref))
-
- // Receive the request and reply back
- server.expectMsgType[HttpRequest]
- server.reply(HttpResponse(entity = "ok"))
- client.expectMsgType[HttpResponse]
-
- // Finish the trace
- testContext.map(_.finish(Map.empty))
-
- metricListener.fishForMessage() {
- case snapshot @ TickMetricSnapshot(_, _, metrics) ⇒
- metrics.filterKeys(_.name == "direct-to-http-manager-request").exists {
- case (group, snapshot) ⇒
- snapshot.metrics.filterKeys(id ⇒ id.name == "" && id.tag == ClientRequestInstrumentation.SprayTime).nonEmpty
- }
- case other ⇒ false
+ // Finish the request cycle, just to avoid error messages on the logs.
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+ testContext.map(_.finish(Map.empty))
}
}
- "record the elapsed time for a http request when using the pipelining sendReceive and tag it as UserTime" in {
+ "not configured to do automatic-trace-token-propagation" should {
+ "not include the trace token header on spray-client requests" in {
+ disableAutomaticTraceTokenPropagation()
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
- val (hostConnector, server) = buildSHostConnectorAndServer
- val client = TestProbe()
- val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 10 seconds)
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") {
+ client.send(hostConnector, Get("/dummy-path"))
+ TraceRecorder.currentContext
+ }
- // Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("pipelining-helper-request") {
- pipeline(Get("/pipelining-helper-request")) to client.ref
- TraceRecorder.currentContext
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
+
+ // Finish the request cycle, just to avoid error messages on the logs.
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+ testContext.map(_.finish(Map.empty))
}
+ }
- // Accept the connection at the server side
- server.expectMsgType[Http.Connected]
- server.reply(Http.Register(server.ref))
-
- // Receive the request and reply back
- server.expectMsgType[HttpRequest]
- server.reply(HttpResponse(entity = "ok"))
- client.expectMsgType[HttpResponse]
-
- // Finish the trace
- testContext.map(_.finish(Map.empty))
-
- metricListener.fishForMessage() {
- case snapshot @ TickMetricSnapshot(_, _, metrics) ⇒
- metrics.filterKeys(_.name == "pipelining-helper-request").exists {
- case (group, snapshot) ⇒
- snapshot.metrics.filterKeys(id ⇒ id.name == "" && id.tag == ClientRequestInstrumentation.UserTime).nonEmpty
- }
- case other ⇒ false
+ "configured to use pipelining segment collection strategy" should {
+ "open a segment when sendReceive is called and close it when the resulting Future[HttpResponse] is completed" in {
+ enablePipeliningSegmentCollectionStrategy()
+
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
+ val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds)
+
+ val metricListener = TestProbe()
+ Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ metricListener.expectMsgType[TickMetricSnapshot]
+
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("pipelining-strategy-client-request") {
+ pipeline(Get("/dummy-path")) to client.ref
+ TraceRecorder.currentContext
+ }
+
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ val req = server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+
+ // Finish the trace
+ testContext.map(_.finish(Map.empty))
+
+ val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds)
+ traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
+ traceMetrics.segments should not be empty
+ val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.tag == ClientRequestInstrumentation.UserTime } map (_._2)
+ recordedSegment should not be empty
+ recordedSegment map { segmentMetrics ⇒
+ segmentMetrics.numberOfMeasurements should be(1L)
+ }
}
}
+
+ "configured to use internal segment collection strategy" should {
+ "open a segment upon reception of a request by the HttpHostConnector and close it when sending the response" in {
+ enableInternalSegmentCollectionStrategy()
+
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
+ val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds)
+
+ val metricListener = TestProbe()
+ Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ metricListener.expectMsgType[TickMetricSnapshot]
+
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("internal-strategy-client-request") {
+ pipeline(Get("/dummy-path")) to client.ref
+ TraceRecorder.currentContext
+ }
+
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+
+ // Finish the trace
+ testContext.map(_.finish(Map.empty))
+
+ val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds)
+ traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
+ traceMetrics.segments should not be empty
+ val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.tag == ClientRequestInstrumentation.SprayTime } map (_._2)
+ recordedSegment should not be empty
+ recordedSegment map { segmentMetrics ⇒
+ segmentMetrics.numberOfMeasurements should be(1L)
+ }
+ }
+ }
+ }
+
+ def expectTraceMetrics(traceName: String, listener: TestProbe, timeout: FiniteDuration): TraceMetricSnapshot = {
+ val tickSnapshot = within(timeout) {
+ listener.expectMsgType[TickMetricSnapshot]
+ }
+
+ val metricsOption = tickSnapshot.metrics.get(TraceMetrics(traceName))
+ metricsOption should not be empty
+ metricsOption.get.asInstanceOf[TraceMetricSnapshot]
}
+ def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal)
+ def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Pipelining)
+ def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true)
+ def disableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(false)
+
+ def setSegmentCollectionStrategy(strategy: ClientSegmentCollectionStrategy.Strategy): Unit = {
+ val target = Kamon(Spray)(system)
+ val field = target.getClass.getDeclaredField("clientSegmentCollectionStrategy")
+ field.setAccessible(true)
+ field.set(target, strategy)
+ }
+
+ def setIncludeTraceToken(include: Boolean): Unit = {
+ val target = Kamon(Spray)(system)
+ val field = target.getClass.getDeclaredField("includeTraceToken")
+ field.setAccessible(true)
+ field.set(target, include)
+ }
}
diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
index 1c5b56cd..7edbbe11 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
@@ -1,22 +1,23 @@
-/* ===================================================
+/*
+ * =========================================================================================
* Copyright © 2013 the kamon project <http://kamon.io/>
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.spray
import _root_.spray.httpx.RequestBuilding
-import akka.testkit.{ TestProbe, TestKit }
+import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.ActorSystem
import org.scalatest.{ Matchers, WordSpecLike }
import kamon.Kamon
@@ -25,25 +26,58 @@ import spray.http.HttpHeaders.RawHeader
import spray.http.{ HttpResponse, HttpRequest }
import kamon.metrics.{ TraceMetrics, Metrics }
import kamon.metrics.Subscriptions.TickMetricSnapshot
+import com.typesafe.config.ConfigFactory
+import kamon.metrics.TraceMetrics.ElapsedTime
-class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with WordSpecLike with Matchers with RequestBuilding
+class ServerRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding
with ScalaFutures with PatienceConfiguration with TestServer {
+ implicit lazy val system: ActorSystem = ActorSystem("client-pipelining-segment-strategy-instrumentation-spec", ConfigFactory.parseString(
+ """
+ |akka {
+ | loglevel = ERROR
+ |}
+ |
+ |kamon {
+ | metrics {
+ | tick-interval = 2 seconds
+ |
+ | filters = [
+ | {
+ | trace {
+ | includes = [ "*" ]
+ | excludes = []
+ | }
+ | }
+ | ]
+ | }
+ |
+ | spray {
+ | client {
+ | segment-collection-strategy = internal
+ | }
+ | }
+ |}
+ """.stripMargin))
+
"the spray server request tracing instrumentation" should {
- "reply back with the same trace token header provided in the request" in {
+ "include the trace-token header in responses when the automatic-trace-token-propagation is enabled" in {
+ enableAutomaticTraceTokenPropagation()
+
val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
- client.send(connection, Get("/").withHeaders(RawHeader("X-Trace-Token", "reply-trace-token")))
+ client.send(connection, Get("/").withHeaders(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-enabled")))
server.expectMsgType[HttpRequest]
server.reply(HttpResponse(entity = "ok"))
val response = client.expectMsgType[HttpResponse]
- response.headers should contain(RawHeader("X-Trace-Token", "reply-trace-token"))
-
+ response.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-enabled"))
}
- "reply back with a automatically assigned trace token if none was provided with the request" in {
+ "reply back with an automatically assigned trace token if none was provided with the request and automatic-trace-token-propagation is enabled" in {
+ enableAutomaticTraceTokenPropagation()
+
val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
@@ -52,28 +86,56 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with
server.reply(HttpResponse(entity = "ok"))
val response = client.expectMsgType[HttpResponse]
- response.headers.filter(_.name == "X-Trace-Token").size should be(1)
+ response.headers.filter(_.name == Kamon(Spray).traceTokenHeaderName).size should be(1)
}
+ "not include the trace-token header in responses when the automatic-trace-token-propagation is disabled" in {
+ disableAutomaticTraceTokenPropagation()
+
+ val (connection, server) = buildClientConnectionAndServer
+ val client = TestProbe()
+
+ client.send(connection, Get("/").withHeaders(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled")))
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ val response = client.expectMsgType[HttpResponse]
+
+ response.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled"))
+ }
+
"open and finish a trace during the lifetime of a request" in {
val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
val metricListener = TestProbe()
Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ metricListener.expectMsgType[TickMetricSnapshot]
client.send(connection, Get("/open-and-finish"))
server.expectMsgType[HttpRequest]
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
- metricListener.fishForMessage() {
- case snapshot @ TickMetricSnapshot(_, _, metrics) ⇒ metrics.keys.exists(_.name.contains("open-and-finish"))
- case other ⇒ false
+ val tickSnapshot = metricListener.expectMsgType[TickMetricSnapshot]
+ val traceMetrics = tickSnapshot.metrics.find { case (k, v) ⇒ k.name.contains("open-and-finish") } map (_._2.metrics)
+ traceMetrics should not be empty
+
+ traceMetrics map { metrics ⇒
+ metrics(ElapsedTime).numberOfMeasurements should be(1L)
}
}
}
+ def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true)
+ def disableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(false)
+
+ def setIncludeTraceToken(include: Boolean): Unit = {
+ val target = Kamon(Spray)(system)
+ val field = target.getClass.getDeclaredField("includeTraceToken")
+ field.setAccessible(true)
+ field.set(target, include)
+ }
+
}