aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala')
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala224
1 files changed, 152 insertions, 72 deletions
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
index 54329645..57f9ebe1 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -18,22 +18,23 @@ package kamon.spray
import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.ActorSystem
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.{ Matchers, WordSpecLike }
import spray.httpx.RequestBuilding
import spray.http.{ HttpResponse, HttpRequest }
-import kamon.trace.TraceRecorder
+import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder }
import com.typesafe.config.ConfigFactory
import spray.can.Http
import spray.http.HttpHeaders.RawHeader
import kamon.Kamon
import kamon.metric.{ TraceMetrics, Metrics }
-import spray.client.pipelining
+import spray.client.pipelining.sendReceive
import kamon.metric.Subscriptions.TickMetricSnapshot
import scala.concurrent.duration._
-import akka.pattern.pipe
-import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot }
+import kamon.metric.TraceMetrics.TraceMetricsSnapshot
-class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer {
+class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with ScalaFutures with RequestBuilding with TestServer {
implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString(
"""
|akka {
@@ -41,8 +42,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
|}
|
|kamon {
+ | spray {
+ | name-generator = kamon.spray.TestSprayNameGenerator
+ | }
+ |
| metrics {
- | tick-interval = 2 seconds
+ | tick-interval = 1 hour
|
| filters = [
| {
@@ -57,19 +62,48 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
""".stripMargin))
implicit def ec = system.dispatcher
+ implicit val defaultPatience = PatienceConfig(timeout = Span(10, Seconds), interval = Span(5, Millis))
- "the client instrumentation" when {
- "configured to do automatic-trace-token-propagation" should {
- "include the trace token header on spray-client requests" in {
+ "the spray client instrumentation" when {
+ "using the request-level api" should {
+ "include the trace token header if automatic-trace-token-propagation is enabled" in {
enableAutomaticTraceTokenPropagation()
+ val (_, server, bound) = buildSHostConnectorAndServer
- val (hostConnector, server) = buildSHostConnectorAndServer
- val client = TestProbe()
+ // Initiate a request within the context of a trace
+ val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("include-trace-token-header-at-request-level-api") {
+ val rF = sendReceive(system, ec) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path")
+ }
+
+ (TraceRecorder.currentContext, rF)
+ }
+
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+
+ // Finish the request cycle, just to avoid error messages on the logs.
+ server.reply(HttpResponse(entity = "ok"))
+ responseFuture.futureValue.entity.asString should be("ok")
+ testContext.finish()
+ }
+
+ "not include the trace token header if automatic-trace-token-propagation is disabled" in {
+ disableAutomaticTraceTokenPropagation()
+ val (_, server, bound) = buildSHostConnectorAndServer
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") {
- client.send(hostConnector, Get("/dummy-path"))
- TraceRecorder.currentContext
+ val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("do-not-include-trace-token-header-at-request-level-api") {
+ val rF = sendReceive(system, ec) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path")
+ }
+
+ (TraceRecorder.currentContext, rF)
}
// Accept the connection at the server side
@@ -78,24 +112,82 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
- client.expectMsgType[HttpResponse]
- testContext.map(_.finish(Map.empty))
+ responseFuture.futureValue.entity.asString should be("ok")
+ testContext.finish()
+ }
+
+ "start and finish a segment that must be named using the request level api name assignation" in {
+ enableAutomaticTraceTokenPropagation()
+ enablePipeliningSegmentCollectionStrategy()
+
+ val transport = TestProbe()
+ val (_, _, bound) = buildSHostConnectorAndServer
+
+ // Initiate a request within the context of a trace
+ val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("assign-name-to-segment-with-request-level-api") {
+ val rF = sendReceive(transport.ref)(ec, 10.seconds) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment")
+ }
+
+ (TraceRecorder.currentContext, rF)
+ }
+
+ // Receive the request and reply back
+ transport.expectMsgType[HttpRequest]
+ transport.reply(HttpResponse(entity = "ok"))
+ responseFuture.futureValue.entity.asString should be("ok")
+ testContext.finish()
+
+ val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api")
+ traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1)
+ traceMetricsSnapshot.segments(SegmentMetricIdentity("request-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1)
+ }
+
+ "rename a request level api segment once it reaches the relevant host connector" in {
+ enableAutomaticTraceTokenPropagation()
+ enablePipeliningSegmentCollectionStrategy()
+
+ val (_, server, bound) = buildSHostConnectorAndServer
+
+ // Initiate a request within the context of a trace
+ val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("rename-segment-with-request-level-api") {
+ val rF = sendReceive(system, ec) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment")
+ }
+
+ (TraceRecorder.currentContext, rF)
+ }
+
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ responseFuture.futureValue.entity.asString should be("ok")
+ testContext.finish()
+
+ val traceMetricsSnapshot = takeSnapshotOf("rename-segment-with-request-level-api")
+ traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1)
+ traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1)
}
}
- "not configured to do automatic-trace-token-propagation" should {
- "not include the trace token header on spray-client requests" in {
- disableAutomaticTraceTokenPropagation()
+ "using the host-level api" should {
+ "include the trace token header on spray-client requests if automatic-trace-token-propagation is enabled" in {
+ enableAutomaticTraceTokenPropagation()
+ enableInternalSegmentCollectionStrategy()
- val (hostConnector, server) = buildSHostConnectorAndServer
+ val (hostConnector, server, _) = buildSHostConnectorAndServer
val client = TestProbe()
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") {
+ val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") {
client.send(hostConnector, Get("/dummy-path"))
TraceRecorder.currentContext
}
@@ -106,30 +198,24 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
+ request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
- testContext.map(_.finish(Map.empty))
+ testContext.finish()
}
- }
- "configured to use pipelining segment collection strategy" should {
- "open a segment when sendReceive is called and close it when the resulting Future[HttpResponse] is completed" in {
- enablePipeliningSegmentCollectionStrategy()
+ "not include the trace token header on spray-client requests if automatic-trace-token-propagation is disabled" in {
+ disableAutomaticTraceTokenPropagation()
+ enableInternalSegmentCollectionStrategy()
- val (hostConnector, server) = buildSHostConnectorAndServer
+ val (hostConnector, server, _) = buildSHostConnectorAndServer
val client = TestProbe()
- val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds)
-
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
- metricListener.expectMsgType[TickMetricSnapshot]
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("pipelining-strategy-client-request") {
- pipeline(Get("/dummy-path")) to client.ref
+ val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") {
+ client.send(hostConnector, Get("/dummy-path"))
TraceRecorder.currentContext
}
@@ -138,39 +224,25 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
server.reply(Http.Register(server.ref))
// Receive the request and reply back
- val req = server.expectMsgType[HttpRequest]
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+
+ // Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
-
- // Finish the trace
- testContext.map(_.finish(Map.empty))
-
- val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds)
- traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
- traceMetrics.segments should not be empty
- val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2)
- recordedSegment should not be empty
- recordedSegment map { segmentMetrics ⇒
- segmentMetrics.numberOfMeasurements should be(1L)
- }
+ testContext.finish()
}
- }
- "configured to use internal segment collection strategy" should {
- "open a segment upon reception of a request by the HttpHostConnector and close it when sending the response" in {
+ "start and finish a segment that must be named using the host level api name assignation" in {
+ disableAutomaticTraceTokenPropagation()
enableInternalSegmentCollectionStrategy()
- val (hostConnector, server) = buildSHostConnectorAndServer
+ val (hostConnector, server, _) = buildSHostConnectorAndServer
val client = TestProbe()
- val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds)
-
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
- metricListener.expectMsgType[TickMetricSnapshot]
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("internal-strategy-client-request") {
- pipeline(Get("/dummy-path")) to client.ref
+ val testContext = TraceRecorder.withNewTraceContext("create-segment-with-host-level-api") {
+ client.send(hostConnector, Get("/host-level-api-segment"))
TraceRecorder.currentContext
}
@@ -179,21 +251,17 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
server.reply(Http.Register(server.ref))
// Receive the request and reply back
- server.expectMsgType[HttpRequest]
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+
+ // Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
+ testContext.finish()
- // Finish the trace
- testContext.map(_.finish(Map.empty))
-
- val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds)
- traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
- traceMetrics.segments should not be empty
- val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2)
- recordedSegment should not be empty
- recordedSegment map { segmentMetrics ⇒
- segmentMetrics.numberOfMeasurements should be(1L)
- }
+ val traceMetricsSnapshot = takeSnapshotOf("create-segment-with-host-level-api")
+ traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1)
+ traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /host-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1)
}
}
}
@@ -208,6 +276,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
metricsOption.get.asInstanceOf[TraceMetricsSnapshot]
}
+ def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = {
+ val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory)
+ val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
+ recorder.get.collect(collectionContext)
+ }
+
def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal)
def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Pipelining)
def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true)
@@ -227,3 +301,9 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
field.set(target, include)
}
}
+
+class TestSprayNameGenerator extends SprayNameGenerator {
+ def generateTraceName(request: HttpRequest): String = request.uri.path.toString()
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String = "request-level " + request.uri.path.toString()
+ def generateHostLevelApiSegmentName(request: HttpRequest): String = "host-level " + request.uri.path.toString()
+}