/* * ========================================================================================= * Copyright © 2013 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, * either express or implied. See the License for the specific language governing permissions * and limitations under the License. * ========================================================================================= */ package kamon.spray import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.ActorSystem import org.scalatest.{ Matchers, WordSpecLike } import spray.httpx.RequestBuilding import spray.http.{ HttpResponse, HttpRequest } import kamon.trace.{ SegmentMetricIdentity, TraceRecorder } import com.typesafe.config.ConfigFactory import spray.can.Http import spray.http.HttpHeaders.RawHeader import kamon.Kamon import kamon.metric.{ TraceMetrics, Metrics } import spray.client.pipelining import kamon.metric.Subscriptions.TickMetricSnapshot import scala.concurrent.duration._ import akka.pattern.pipe import kamon.metric.TraceMetrics.TraceMetricsSnapshot class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer { implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString( """ |akka { | loglevel = ERROR |} | |kamon { | metrics { | tick-interval = 2 seconds | | filters = [ | { | trace { | includes = [ "*" ] | excludes = [] | } | } | ] | } |} """.stripMargin)) implicit def ec = system.dispatcher "the client instrumentation" when { "configured to do automatic-trace-token-propagation" should { "include the trace token header on spray-client requests" in { enableAutomaticTraceTokenPropagation() val (hostConnector, server) = buildSHostConnectorAndServer val client = TestProbe() // Initiate a request within the context of a trace val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") { client.send(hostConnector, Get("/dummy-path")) TraceRecorder.currentContext } // Accept the connection at the server side server.expectMsgType[Http.Connected] server.reply(Http.Register(server.ref)) // Receive the request and reply back val request = server.expectMsgType[HttpRequest] request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] testContext.finish() } } "not configured to do automatic-trace-token-propagation" should { "not include the trace token header on spray-client requests" in { disableAutomaticTraceTokenPropagation() val (hostConnector, server) = buildSHostConnectorAndServer val client = TestProbe() // Initiate a request within the context of a trace val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") { client.send(hostConnector, Get("/dummy-path")) TraceRecorder.currentContext } // Accept the connection at the server side server.expectMsgType[Http.Connected] server.reply(Http.Register(server.ref)) // Receive the request and reply back val request = server.expectMsgType[HttpRequest] request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] testContext.finish() } } "configured to use pipelining segment collection strategy" should { "open a segment when sendReceive is called and close it when the resulting Future[HttpResponse] is completed" in { enablePipeliningSegmentCollectionStrategy() val (hostConnector, server) = buildSHostConnectorAndServer val client = TestProbe() val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds) val metricListener = TestProbe() Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) metricListener.expectMsgType[TickMetricSnapshot] // Initiate a request within the context of a trace val testContext = TraceRecorder.withNewTraceContext("pipelining-strategy-client-request") { pipeline(Get("/dummy-path")) to client.ref TraceRecorder.currentContext } // Accept the connection at the server side server.expectMsgType[Http.Connected] server.reply(Http.Register(server.ref)) // Receive the request and reply back val req = server.expectMsgType[HttpRequest] server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] // Finish the trace testContext.finish() val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds) traceMetrics.elapsedTime.numberOfMeasurements should be(1L) traceMetrics.segments should not be empty val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) recordedSegment should not be empty recordedSegment map { segmentMetrics ⇒ segmentMetrics.numberOfMeasurements should be(1L) } } } "configured to use internal segment collection strategy" should { "open a segment upon reception of a request by the HttpHostConnector and close it when sending the response" in { enableInternalSegmentCollectionStrategy() val (hostConnector, server) = buildSHostConnectorAndServer val client = TestProbe() val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds) val metricListener = TestProbe() Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) metricListener.expectMsgType[TickMetricSnapshot] // Initiate a request within the context of a trace val testContext = TraceRecorder.withNewTraceContext("internal-strategy-client-request") { pipeline(Get("/dummy-path")) to client.ref TraceRecorder.currentContext } // Accept the connection at the server side server.expectMsgType[Http.Connected] server.reply(Http.Register(server.ref)) // Receive the request and reply back server.expectMsgType[HttpRequest] server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] // Finish the trace testContext.finish() val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds) traceMetrics.elapsedTime.numberOfMeasurements should be(1L) traceMetrics.segments should not be empty val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) recordedSegment should not be empty recordedSegment map { segmentMetrics ⇒ segmentMetrics.numberOfMeasurements should be(1L) } } } } def expectTraceMetrics(traceName: String, listener: TestProbe, timeout: FiniteDuration): TraceMetricsSnapshot = { val tickSnapshot = within(timeout) { listener.expectMsgType[TickMetricSnapshot] } val metricsOption = tickSnapshot.metrics.get(TraceMetrics(traceName)) metricsOption should not be empty metricsOption.get.asInstanceOf[TraceMetricsSnapshot] } def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal) def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Pipelining) def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true) def disableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(false) def setSegmentCollectionStrategy(strategy: ClientSegmentCollectionStrategy.Strategy): Unit = { val target = Kamon(Spray)(system) val field = target.getClass.getDeclaredField("clientSegmentCollectionStrategy") field.setAccessible(true) field.set(target, strategy) } def setIncludeTraceToken(include: Boolean): Unit = { val target = Kamon(Spray)(system) val field = target.getClass.getDeclaredField("includeTraceToken") field.setAccessible(true) field.set(target, include) } }