path: root/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
diff options
authorIvan Topolnjak <ivantopo@gmail.com>2014-03-24 12:42:18 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-03-24 12:42:18 -0300
commit8ed58faae77db3773ded06ccb8e8529cfb9031d6 (patch)
tree80a511fc42d81e21c17bdd919e4700953c8a0714 /kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
parent1b7442f20d65e4a4b43b995acf8a7af538714913 (diff)
complete automatic trace token propagation for spray-client, closes #14
Diffstat (limited to 'kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala')
1 files changed, 183 insertions, 68 deletions
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
index 0b1db1b7..9469924a 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -1,27 +1,49 @@
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
package kamon.spray
import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.ActorSystem
-import org.scalatest.WordSpecLike
+import org.scalatest.{ Matchers, WordSpecLike }
import spray.httpx.RequestBuilding
-import kamon.Kamon
-import kamon.metrics.{ TraceMetrics, Metrics }
import spray.http.{ HttpResponse, HttpRequest }
-import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.trace.TraceRecorder
-import spray.can.client.ClientRequestInstrumentation
import com.typesafe.config.ConfigFactory
import spray.can.Http
-import akka.pattern.pipe
+import spray.http.HttpHeaders.RawHeader
+import kamon.Kamon
+import kamon.metrics.{ TraceMetrics, Metrics }
import spray.client.pipelining
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import spray.can.client.ClientRequestInstrumentation
import scala.concurrent.duration._
+import akka.pattern.pipe
+import kamon.metrics.TraceMetrics.TraceMetricSnapshot
-class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with RequestBuilding with TestServer {
- implicit lazy val system: ActorSystem = ActorSystem("server-request-tracing-spec", ConfigFactory.parseString(
+class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer {
+ implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString(
+ |akka {
+ | loglevel = ERROR
+ |}
+ |
|kamon {
| metrics {
- | tick-interval = 1 second
+ | tick-interval = 2 seconds
| filters = [
| {
@@ -37,79 +59,172 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
implicit def ec = system.dispatcher
- "the client instrumentation" should {
- "record the elapsed time for a http request when using the Http manager directly and tag it as SprayTime" in {
+ "the client instrumentation" when {
+ "configured to do automatic-trace-token-propagation" should {
+ "include the trace token header on spray-client requests" in {
+ enableAutomaticTraceTokenPropagation()
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
- val (hostConnector, server) = buildSHostConnectorAndServer
- val client = TestProbe()
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") {
+ client.send(hostConnector, Get("/dummy-path"))
+ TraceRecorder.currentContext
+ }
- // Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("direct-to-http-manager-request") {
- client.send(hostConnector, Get("/direct-to-http-manager-request"))
- TraceRecorder.currentContext
- }
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+ // Receive the request and reply back
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
- // Accept the connection at the server side
- server.expectMsgType[Http.Connected]
- server.reply(Http.Register(server.ref))
- // Receive the request and reply back
- server.expectMsgType[HttpRequest]
- server.reply(HttpResponse(entity = "ok"))
- client.expectMsgType[HttpResponse]
- // Finish the trace
- testContext.map(_.finish(Map.empty))
- metricListener.fishForMessage() {
- case snapshot @ TickMetricSnapshot(_, _, metrics) ⇒
- metrics.filterKeys(_.name == "direct-to-http-manager-request").exists {
- case (group, snapshot) ⇒
- snapshot.metrics.filterKeys(id ⇒ id.name == "" && id.tag == ClientRequestInstrumentation.SprayTime).nonEmpty
- }
- case other ⇒ false
+ // Finish the request cycle, just to avoid error messages on the logs.
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+ testContext.map(_.finish(Map.empty))
- "record the elapsed time for a http request when using the pipelining sendReceive and tag it as UserTime" in {
+ "not configured to do automatic-trace-token-propagation" should {
+ "not include the trace token header on spray-client requests" in {
+ disableAutomaticTraceTokenPropagation()
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
- val (hostConnector, server) = buildSHostConnectorAndServer
- val client = TestProbe()
- val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 10 seconds)
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") {
+ client.send(hostConnector, Get("/dummy-path"))
+ TraceRecorder.currentContext
+ }
- // Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("pipelining-helper-request") {
- pipeline(Get("/pipelining-helper-request")) to client.ref
- TraceRecorder.currentContext
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+ // Receive the request and reply back
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
+ // Finish the request cycle, just to avoid error messages on the logs.
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+ testContext.map(_.finish(Map.empty))
+ }
- // Accept the connection at the server side
- server.expectMsgType[Http.Connected]
- server.reply(Http.Register(server.ref))
- // Receive the request and reply back
- server.expectMsgType[HttpRequest]
- server.reply(HttpResponse(entity = "ok"))
- client.expectMsgType[HttpResponse]
- // Finish the trace
- testContext.map(_.finish(Map.empty))
- metricListener.fishForMessage() {
- case snapshot @ TickMetricSnapshot(_, _, metrics) ⇒
- metrics.filterKeys(_.name == "pipelining-helper-request").exists {
- case (group, snapshot) ⇒
- snapshot.metrics.filterKeys(id ⇒ id.name == "" && id.tag == ClientRequestInstrumentation.UserTime).nonEmpty
- }
- case other ⇒ false
+ "configured to use pipelining segment collection strategy" should {
+ "open a segment when sendReceive is called and close it when the resulting Future[HttpResponse] is completed" in {
+ enablePipeliningSegmentCollectionStrategy()
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
+ val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds)
+ val metricListener = TestProbe()
+ Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ metricListener.expectMsgType[TickMetricSnapshot]
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("pipelining-strategy-client-request") {
+ pipeline(Get("/dummy-path")) to client.ref
+ TraceRecorder.currentContext
+ }
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+ // Receive the request and reply back
+ val req = server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+ // Finish the trace
+ testContext.map(_.finish(Map.empty))
+ val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds)
+ traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
+ traceMetrics.segments should not be empty
+ val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.tag == ClientRequestInstrumentation.UserTime } map (_._2)
+ recordedSegment should not be empty
+ recordedSegment map { segmentMetrics ⇒
+ segmentMetrics.numberOfMeasurements should be(1L)
+ }
+ "configured to use internal segment collection strategy" should {
+ "open a segment upon reception of a request by the HttpHostConnector and close it when sending the response" in {
+ enableInternalSegmentCollectionStrategy()
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
+ val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds)
+ val metricListener = TestProbe()
+ Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ metricListener.expectMsgType[TickMetricSnapshot]
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("internal-strategy-client-request") {
+ pipeline(Get("/dummy-path")) to client.ref
+ TraceRecorder.currentContext
+ }
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+ // Receive the request and reply back
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+ // Finish the trace
+ testContext.map(_.finish(Map.empty))
+ val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds)
+ traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
+ traceMetrics.segments should not be empty
+ val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.tag == ClientRequestInstrumentation.SprayTime } map (_._2)
+ recordedSegment should not be empty
+ recordedSegment map { segmentMetrics ⇒
+ segmentMetrics.numberOfMeasurements should be(1L)
+ }
+ }
+ }
+ }
+ def expectTraceMetrics(traceName: String, listener: TestProbe, timeout: FiniteDuration): TraceMetricSnapshot = {
+ val tickSnapshot = within(timeout) {
+ listener.expectMsgType[TickMetricSnapshot]
+ }
+ val metricsOption = tickSnapshot.metrics.get(TraceMetrics(traceName))
+ metricsOption should not be empty
+ metricsOption.get.asInstanceOf[TraceMetricSnapshot]
+ def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal)
+ def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Pipelining)
+ def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true)
+ def disableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(false)
+ def setSegmentCollectionStrategy(strategy: ClientSegmentCollectionStrategy.Strategy): Unit = {
+ val target = Kamon(Spray)(system)
+ val field = target.getClass.getDeclaredField("clientSegmentCollectionStrategy")
+ field.setAccessible(true)
+ field.set(target, strategy)
+ }
+ def setIncludeTraceToken(include: Boolean): Unit = {
+ val target = Kamon(Spray)(system)
+ val field = target.getClass.getDeclaredField("includeTraceToken")
+ field.setAccessible(true)
+ field.set(target, include)
+ }