aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray/src/test/scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-03-24 12:42:18 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-03-24 12:42:18 -0300
commit8ed58faae77db3773ded06ccb8e8529cfb9031d6 (patch)
tree80a511fc42d81e21c17bdd919e4700953c8a0714 /kamon-spray/src/test/scala
parent1b7442f20d65e4a4b43b995acf8a7af538714913 (diff)
downloadKamon-8ed58faae77db3773ded06ccb8e8529cfb9031d6.tar.gz
Kamon-8ed58faae77db3773ded06ccb8e8529cfb9031d6.tar.bz2
Kamon-8ed58faae77db3773ded06ccb8e8529cfb9031d6.zip
complete automatic trace token propagation for spray-client, closes #14
Diffstat (limited to 'kamon-spray/src/test/scala')
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala251
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala106
2 files changed, 267 insertions, 90 deletions
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
index 0b1db1b7..9469924a 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -1,27 +1,49 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.spray
import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.ActorSystem
-import org.scalatest.WordSpecLike
+import org.scalatest.{ Matchers, WordSpecLike }
import spray.httpx.RequestBuilding
-import kamon.Kamon
-import kamon.metrics.{ TraceMetrics, Metrics }
import spray.http.{ HttpResponse, HttpRequest }
-import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.trace.TraceRecorder
-import spray.can.client.ClientRequestInstrumentation
import com.typesafe.config.ConfigFactory
import spray.can.Http
-import akka.pattern.pipe
+import spray.http.HttpHeaders.RawHeader
+import kamon.Kamon
+import kamon.metrics.{ TraceMetrics, Metrics }
import spray.client.pipelining
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import spray.can.client.ClientRequestInstrumentation
import scala.concurrent.duration._
+import akka.pattern.pipe
+import kamon.metrics.TraceMetrics.TraceMetricSnapshot
-class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with RequestBuilding with TestServer {
- implicit lazy val system: ActorSystem = ActorSystem("server-request-tracing-spec", ConfigFactory.parseString(
+class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer {
+ implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString(
"""
+ |akka {
+ | loglevel = ERROR
+ |}
+ |
|kamon {
| metrics {
- | tick-interval = 1 second
+ | tick-interval = 2 seconds
|
| filters = [
| {
@@ -37,79 +59,172 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
implicit def ec = system.dispatcher
- "the client instrumentation" should {
- "record the elapsed time for a http request when using the Http manager directly and tag it as SprayTime" in {
+ "the client instrumentation" when {
+ "configured to do automatic-trace-token-propagation" should {
+ "include the trace token header on spray-client requests" in {
+ enableAutomaticTraceTokenPropagation()
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
- val (hostConnector, server) = buildSHostConnectorAndServer
- val client = TestProbe()
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") {
+ client.send(hostConnector, Get("/dummy-path"))
+ TraceRecorder.currentContext
+ }
- // Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("direct-to-http-manager-request") {
- client.send(hostConnector, Get("/direct-to-http-manager-request"))
- TraceRecorder.currentContext
- }
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
- // Accept the connection at the server side
- server.expectMsgType[Http.Connected]
- server.reply(Http.Register(server.ref))
-
- // Receive the request and reply back
- server.expectMsgType[HttpRequest]
- server.reply(HttpResponse(entity = "ok"))
- client.expectMsgType[HttpResponse]
-
- // Finish the trace
- testContext.map(_.finish(Map.empty))
-
- metricListener.fishForMessage() {
- case snapshot @ TickMetricSnapshot(_, _, metrics) ⇒
- metrics.filterKeys(_.name == "direct-to-http-manager-request").exists {
- case (group, snapshot) ⇒
- snapshot.metrics.filterKeys(id ⇒ id.name == "" && id.tag == ClientRequestInstrumentation.SprayTime).nonEmpty
- }
- case other ⇒ false
+ // Finish the request cycle, just to avoid error messages on the logs.
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+ testContext.map(_.finish(Map.empty))
}
}
- "record the elapsed time for a http request when using the pipelining sendReceive and tag it as UserTime" in {
+ "not configured to do automatic-trace-token-propagation" should {
+ "not include the trace token header on spray-client requests" in {
+ disableAutomaticTraceTokenPropagation()
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
- val (hostConnector, server) = buildSHostConnectorAndServer
- val client = TestProbe()
- val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 10 seconds)
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") {
+ client.send(hostConnector, Get("/dummy-path"))
+ TraceRecorder.currentContext
+ }
- // Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("pipelining-helper-request") {
- pipeline(Get("/pipelining-helper-request")) to client.ref
- TraceRecorder.currentContext
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
+
+ // Finish the request cycle, just to avoid error messages on the logs.
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+ testContext.map(_.finish(Map.empty))
}
+ }
- // Accept the connection at the server side
- server.expectMsgType[Http.Connected]
- server.reply(Http.Register(server.ref))
-
- // Receive the request and reply back
- server.expectMsgType[HttpRequest]
- server.reply(HttpResponse(entity = "ok"))
- client.expectMsgType[HttpResponse]
-
- // Finish the trace
- testContext.map(_.finish(Map.empty))
-
- metricListener.fishForMessage() {
- case snapshot @ TickMetricSnapshot(_, _, metrics) ⇒
- metrics.filterKeys(_.name == "pipelining-helper-request").exists {
- case (group, snapshot) ⇒
- snapshot.metrics.filterKeys(id ⇒ id.name == "" && id.tag == ClientRequestInstrumentation.UserTime).nonEmpty
- }
- case other ⇒ false
+ "configured to use pipelining segment collection strategy" should {
+ "open a segment when sendReceive is called and close it when the resulting Future[HttpResponse] is completed" in {
+ enablePipeliningSegmentCollectionStrategy()
+
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
+ val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds)
+
+ val metricListener = TestProbe()
+ Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ metricListener.expectMsgType[TickMetricSnapshot]
+
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("pipelining-strategy-client-request") {
+ pipeline(Get("/dummy-path")) to client.ref
+ TraceRecorder.currentContext
+ }
+
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ val req = server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+
+ // Finish the trace
+ testContext.map(_.finish(Map.empty))
+
+ val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds)
+ traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
+ traceMetrics.segments should not be empty
+ val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.tag == ClientRequestInstrumentation.UserTime } map (_._2)
+ recordedSegment should not be empty
+ recordedSegment map { segmentMetrics ⇒
+ segmentMetrics.numberOfMeasurements should be(1L)
+ }
}
}
+
+ "configured to use internal segment collection strategy" should {
+ "open a segment upon reception of a request by the HttpHostConnector and close it when sending the response" in {
+ enableInternalSegmentCollectionStrategy()
+
+ val (hostConnector, server) = buildSHostConnectorAndServer
+ val client = TestProbe()
+ val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds)
+
+ val metricListener = TestProbe()
+ Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ metricListener.expectMsgType[TickMetricSnapshot]
+
+ // Initiate a request within the context of a trace
+ val testContext = TraceRecorder.withNewTraceContext("internal-strategy-client-request") {
+ pipeline(Get("/dummy-path")) to client.ref
+ TraceRecorder.currentContext
+ }
+
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
+
+ // Finish the trace
+ testContext.map(_.finish(Map.empty))
+
+ val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds)
+ traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
+ traceMetrics.segments should not be empty
+ val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.tag == ClientRequestInstrumentation.SprayTime } map (_._2)
+ recordedSegment should not be empty
+ recordedSegment map { segmentMetrics ⇒
+ segmentMetrics.numberOfMeasurements should be(1L)
+ }
+ }
+ }
+ }
+
+ def expectTraceMetrics(traceName: String, listener: TestProbe, timeout: FiniteDuration): TraceMetricSnapshot = {
+ val tickSnapshot = within(timeout) {
+ listener.expectMsgType[TickMetricSnapshot]
+ }
+
+ val metricsOption = tickSnapshot.metrics.get(TraceMetrics(traceName))
+ metricsOption should not be empty
+ metricsOption.get.asInstanceOf[TraceMetricSnapshot]
}
+ def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal)
+ def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Pipelining)
+ def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true)
+ def disableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(false)
+
+ def setSegmentCollectionStrategy(strategy: ClientSegmentCollectionStrategy.Strategy): Unit = {
+ val target = Kamon(Spray)(system)
+ val field = target.getClass.getDeclaredField("clientSegmentCollectionStrategy")
+ field.setAccessible(true)
+ field.set(target, strategy)
+ }
+
+ def setIncludeTraceToken(include: Boolean): Unit = {
+ val target = Kamon(Spray)(system)
+ val field = target.getClass.getDeclaredField("includeTraceToken")
+ field.setAccessible(true)
+ field.set(target, include)
+ }
}
diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
index 1c5b56cd..7edbbe11 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
@@ -1,22 +1,23 @@
-/* ===================================================
+/*
+ * =========================================================================================
* Copyright © 2013 the kamon project <http://kamon.io/>
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.spray
import _root_.spray.httpx.RequestBuilding
-import akka.testkit.{ TestProbe, TestKit }
+import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.ActorSystem
import org.scalatest.{ Matchers, WordSpecLike }
import kamon.Kamon
@@ -25,25 +26,58 @@ import spray.http.HttpHeaders.RawHeader
import spray.http.{ HttpResponse, HttpRequest }
import kamon.metrics.{ TraceMetrics, Metrics }
import kamon.metrics.Subscriptions.TickMetricSnapshot
+import com.typesafe.config.ConfigFactory
+import kamon.metrics.TraceMetrics.ElapsedTime
-class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with WordSpecLike with Matchers with RequestBuilding
+class ServerRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding
with ScalaFutures with PatienceConfiguration with TestServer {
+ implicit lazy val system: ActorSystem = ActorSystem("client-pipelining-segment-strategy-instrumentation-spec", ConfigFactory.parseString(
+ """
+ |akka {
+ | loglevel = ERROR
+ |}
+ |
+ |kamon {
+ | metrics {
+ | tick-interval = 2 seconds
+ |
+ | filters = [
+ | {
+ | trace {
+ | includes = [ "*" ]
+ | excludes = []
+ | }
+ | }
+ | ]
+ | }
+ |
+ | spray {
+ | client {
+ | segment-collection-strategy = internal
+ | }
+ | }
+ |}
+ """.stripMargin))
+
"the spray server request tracing instrumentation" should {
- "reply back with the same trace token header provided in the request" in {
+ "include the trace-token header in responses when the automatic-trace-token-propagation is enabled" in {
+ enableAutomaticTraceTokenPropagation()
+
val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
- client.send(connection, Get("/").withHeaders(RawHeader("X-Trace-Token", "reply-trace-token")))
+ client.send(connection, Get("/").withHeaders(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-enabled")))
server.expectMsgType[HttpRequest]
server.reply(HttpResponse(entity = "ok"))
val response = client.expectMsgType[HttpResponse]
- response.headers should contain(RawHeader("X-Trace-Token", "reply-trace-token"))
-
+ response.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-enabled"))
}
- "reply back with a automatically assigned trace token if none was provided with the request" in {
+ "reply back with an automatically assigned trace token if none was provided with the request and automatic-trace-token-propagation is enabled" in {
+ enableAutomaticTraceTokenPropagation()
+
val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
@@ -52,28 +86,56 @@ class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with
server.reply(HttpResponse(entity = "ok"))
val response = client.expectMsgType[HttpResponse]
- response.headers.filter(_.name == "X-Trace-Token").size should be(1)
+ response.headers.filter(_.name == Kamon(Spray).traceTokenHeaderName).size should be(1)
}
+ "not include the trace-token header in responses when the automatic-trace-token-propagation is disabled" in {
+ disableAutomaticTraceTokenPropagation()
+
+ val (connection, server) = buildClientConnectionAndServer
+ val client = TestProbe()
+
+ client.send(connection, Get("/").withHeaders(RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled")))
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ val response = client.expectMsgType[HttpResponse]
+
+ response.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, "propagation-disabled"))
+ }
+
"open and finish a trace during the lifetime of a request" in {
val (connection, server) = buildClientConnectionAndServer
val client = TestProbe()
val metricListener = TestProbe()
Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+ metricListener.expectMsgType[TickMetricSnapshot]
client.send(connection, Get("/open-and-finish"))
server.expectMsgType[HttpRequest]
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
- metricListener.fishForMessage() {
- case snapshot @ TickMetricSnapshot(_, _, metrics) ⇒ metrics.keys.exists(_.name.contains("open-and-finish"))
- case other ⇒ false
+ val tickSnapshot = metricListener.expectMsgType[TickMetricSnapshot]
+ val traceMetrics = tickSnapshot.metrics.find { case (k, v) ⇒ k.name.contains("open-and-finish") } map (_._2.metrics)
+ traceMetrics should not be empty
+
+ traceMetrics map { metrics ⇒
+ metrics(ElapsedTime).numberOfMeasurements should be(1L)
}
}
}
+ def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true)
+ def disableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(false)
+
+ def setIncludeTraceToken(include: Boolean): Unit = {
+ val target = Kamon(Spray)(system)
+ val field = target.getClass.getDeclaredField("includeTraceToken")
+ field.setAccessible(true)
+ field.set(target, include)
+ }
+
}