aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-10-31 02:45:41 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2014-10-31 02:45:41 +0100
commit89d3057f8025add4b94b32c142e220ffb79f6c33 (patch)
treeca5cb3adccd9032450ec9f4fbfafb5542a52a315
parentcd8dce169b4231bf533445656bfb5a35034a6304 (diff)
downloadKamon-89d3057f8025add4b94b32c142e220ffb79f6c33.tar.gz
Kamon-89d3057f8025add4b94b32c142e220ffb79f6c33.tar.bz2
Kamon-89d3057f8025add4b94b32c142e220ffb79f6c33.zip
+ spray: external naming for traces and segments, related to #65
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala8
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala16
-rw-r--r--kamon-spray/src/main/resources/reference.conf3
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/Spray.scala20
-rw-r--r--kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala121
-rw-r--r--kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala2
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala214
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/TestServer.scala4
8 files changed, 255 insertions, 133 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
index 9b541a32..90928ba0 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
@@ -59,7 +59,7 @@ class ActorCellInstrumentation {
def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
val timestampBeforeProcessing = System.nanoTime()
- val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware]
+ val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware]
try {
TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) {
@@ -154,13 +154,13 @@ class ActorCellMetricsIntoActorCellMixin {
class TraceContextIntoEnvelopeMixin {
@DeclareMixin("akka.dispatch.Envelope")
- def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default
+ def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default
@Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
- def envelopeCreation(ctx: TraceContextAware): Unit = {}
+ def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {}
@After("envelopeCreation(ctx)")
- def afterEnvelopeCreation(ctx: TraceContextAware): Unit = {
+ def afterEnvelopeCreation(ctx: TimestampedTraceContextAware): Unit = {
// Necessary to force the initialization of ContextAware at the moment of creation.
ctx.traceContext
}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 08289acf..c4c28a68 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -129,7 +129,7 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean,
def finish: Unit = {
val segmentFinishNanoTime = System.nanoTime()
- finishSegment(segmentName, label, (segmentFinishNanoTime - _segmentStartNanoTime))
+ finishSegment(name, label, (segmentFinishNanoTime - _segmentStartNanoTime))
}
}
}
@@ -155,7 +155,6 @@ object TraceContextOrigin {
}
trait TraceContextAware extends Serializable {
- def captureNanoTime: Long
def traceContext: TraceContext
}
@@ -163,7 +162,6 @@ object TraceContextAware {
def default: TraceContextAware = new DefaultTraceContextAware
class DefaultTraceContextAware extends TraceContextAware {
- @transient val captureNanoTime = System.nanoTime()
@transient val traceContext = TraceRecorder.currentContext
//
@@ -180,7 +178,17 @@ object TraceContextAware {
}
}
-trait SegmentAware extends TraceContextAware {
+trait TimestampedTraceContextAware extends TraceContextAware {
+ def captureNanoTime: Long
+}
+
+object TimestampedTraceContextAware {
+ def default: TimestampedTraceContextAware = new DefaultTraceContextAware with TimestampedTraceContextAware {
+ @transient val captureNanoTime = System.nanoTime()
+ }
+}
+
+trait SegmentAware {
@volatile var segment: Segment = EmptyTraceContext.EmptySegment
}
diff --git a/kamon-spray/src/main/resources/reference.conf b/kamon-spray/src/main/resources/reference.conf
index d497e681..9fed5a2b 100644
--- a/kamon-spray/src/main/resources/reference.conf
+++ b/kamon-spray/src/main/resources/reference.conf
@@ -16,6 +16,9 @@ kamon {
# in the `HttpRequest` headers.
automatic-trace-token-propagation = true
+ # Fully qualified name of the implementation of kamon.spray.SprayNameGenerator that will be used for assigning names
+ # to traces and client http segments.
+ name-generator = kamon.spray.SimpleSprayNameGenerator
client {
# Strategy used for automatic trace segment generation when issue requests with spray-client. The possible values
diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala
index 76adb214..4a0fd74e 100644
--- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/Spray.scala
@@ -43,6 +43,9 @@ class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Exte
val httpServerMetrics = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get
// It's safe to assume that HttpServerMetrics will always exist because there is no particular filter for it.
+ private val nameGeneratorFQN = config.getString("name-generator")
+ private val nameGenerator: SprayNameGenerator = system.dynamicAccess.createInstanceFor[SprayNameGenerator](nameGeneratorFQN, Nil).get // let's bubble up any problems.
+
val clientSegmentCollectionStrategy: ClientSegmentCollectionStrategy.Strategy =
config.getString("client.segment-collection-strategy") match {
case "pipelining" ⇒ ClientSegmentCollectionStrategy.Pipelining
@@ -51,6 +54,19 @@ class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Exte
s"only pipelining and internal are valid options.")
}
- // Later we should expose a way for the user to customize this.
- def assignHttpClientRequestName(request: HttpRequest): String = request.uri.authority.host.address
+ def generateTraceName(request: HttpRequest): String = nameGenerator.generateTraceName(request)
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateRequestLevelApiSegmentName(request)
+ def generateHostLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateHostLevelApiSegmentName(request)
+}
+
+trait SprayNameGenerator {
+ def generateTraceName(request: HttpRequest): String
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String
+ def generateHostLevelApiSegmentName(request: HttpRequest): String
+}
+
+class SimpleSprayNameGenerator extends SprayNameGenerator {
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String = request.method.value + ": " + request.uri.path
+ def generateTraceName(request: HttpRequest): String = request.method.value + ": " + request.uri.path
+ def generateHostLevelApiSegmentName(request: HttpRequest): String = request.uri.authority.host.address
}
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
index cfd204df..94fc3572 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
@@ -18,7 +18,7 @@ package spray.can.client
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
-import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest }
+import spray.http._
import spray.http.HttpHeaders.RawHeader
import kamon.trace._
import kamon.Kamon
@@ -31,60 +31,79 @@ import akka.util.Timeout
class ClientRequestInstrumentation {
@DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
- def mixin: SegmentAware = SegmentAware.default
+ def mixinTraceContextAwareToRequestContext: TraceContextAware = TraceContextAware.default
+
+ @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
+ def mixinSegmentAwareToRequestContext: SegmentAware = SegmentAware.default
+
+ @DeclareMixin("spray.http.HttpRequest")
+ def mixinSegmentAwareToHttpRequest: SegmentAware = SegmentAware.default
@Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)")
- def requestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {}
+ def requestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {}
@After("requestContextCreation(requestContext, request)")
- def afterRequestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {
- // The RequestContext will be copied when a request needs to be retried but we are only interested in creating the
- // segment the first time we create one.
+ def afterRequestContextCreation(requestContext: SegmentAware with TraceContextAware, request: HttpRequest): Unit = {
+ // This read to requestContext.traceContext takes care of initializing the aspect timely.
+ requestContext.traceContext
- // The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely.
- if (requestContext.segment.isEmpty) {
- TraceRecorder.currentContext match {
- case ctx: DefaultTraceContext ⇒
- val sprayExtension = Kamon(Spray)(ctx.system)
+ TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ val sprayExtension = Kamon(Spray)(system)
- if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
- val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
- val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient)
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
+ if (requestContext.segment.isEmpty) {
+ val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request)
+ val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient)
+ requestContext.segment = segment
+ }
- requestContext.segment = segment
- }
+ } else {
- case EmptyTraceContext ⇒ // Nothing to do here.
+ // We have a Request Level API, let's just make sure that we rename it accordingly. The reason for assigning a
+ // name again here is that when the request was initially sent it might not have the Host information available
+ // and it might be important to decide a proper segment name.
+
+ val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request)
+ request.asInstanceOf[SegmentAware].segment.rename(clientRequestName)
}
}
}
@Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)")
- def copyingRequestContext(old: SegmentAware): Unit = {}
+ def copyingRequestContext(old: TraceContextAware): Unit = {}
@Around("copyingRequestContext(old)")
- def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: SegmentAware): Any = {
+ def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TraceContextAware): Any = {
TraceRecorder.withInlineTraceContextReplacement(old.traceContext) {
pjp.proceed()
}
}
@Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)")
- def dispatchToCommander(requestContext: SegmentAware, message: Any): Unit = {}
+ def dispatchToCommander(requestContext: TraceContextAware, message: Any): Unit = {}
@Around("dispatchToCommander(requestContext, message)")
- def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentAware, message: Any) = {
+ def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = {
if (requestContext.traceContext.nonEmpty) {
TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) {
if (message.isInstanceOf[HttpMessageEnd])
- requestContext.segment.finish()
+ requestContext.asInstanceOf[SegmentAware].segment.finish()
pjp.proceed()
}
-
} else pjp.proceed()
}
+ @Pointcut("execution(* spray.http.HttpRequest.copy(..)) && this(old)")
+ def copyingHttpRequest(old: SegmentAware): Unit = {}
+
+ @Around("copyingHttpRequest(old)")
+ def aroundCopyingHttpRequest(pjp: ProceedingJoinPoint, old: SegmentAware): Any = {
+ val copiedHttpRequest = pjp.proceed().asInstanceOf[SegmentAware]
+ copiedHttpRequest.segment = old.segment
+ copiedHttpRequest
+ }
+
@Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)")
def requestLevelApiSendReceive(transport: ActorRef, ec: ExecutionContext, timeout: Timeout): Unit = {}
@@ -93,46 +112,42 @@ class ClientRequestInstrumentation {
val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]]
(request: HttpRequest) ⇒ {
- val responseFuture = originalSendReceive.apply(request)
-
- TraceRecorder.currentContext match {
- case ctx: DefaultTraceContext ⇒
- val sprayExtension = Kamon(Spray)(ctx.system)
+ TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ val sprayExtension = Kamon(Spray)(system)
+ val segment =
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining)
+ ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentMetricIdentityLabel.HttpClient)
+ else
+ EmptyTraceContext.EmptySegment
- if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) {
- val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
- val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient)
+ request.asInstanceOf[SegmentAware].segment = segment
- responseFuture.onComplete { result ⇒
- segment.finish()
- }(ec)
- }
+ val responseFuture = originalSendReceive.apply(request)
+ responseFuture.onComplete { result ⇒
+ segment.finish()
+ }(ec)
- case EmptyTraceContext ⇒ // Nothing to do here.
- }
+ responseFuture
- responseFuture
+ } getOrElse (originalSendReceive.apply(request))
}
-
}
- @Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)")
- def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {}
+ @Pointcut("execution(* spray.http.HttpMessage.withDefaultHeaders(*)) && this(request) && args(defaultHeaders)")
+ def includingDefaultHeadersAtHttpHostConnector(request: HttpMessage, defaultHeaders: List[HttpHeader]): Unit = {}
- @Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)")
- def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = {
- val modifiedHeaders = TraceRecorder.currentContext match {
- case ctx: DefaultTraceContext ⇒
- val sprayExtension = Kamon(Spray)(ctx.system)
+ @Around("includingDefaultHeadersAtHttpHostConnector(request, defaultHeaders)")
+ def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, request: HttpMessage, defaultHeaders: List[HttpHeader]): Any = {
- if (sprayExtension.includeTraceToken)
- RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders
- else
- defaultHeaders
+ val modifiedHeaders = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ val sprayExtension = Kamon(Spray)(system)
+ if (sprayExtension.includeTraceToken)
+ RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders
+ else
+ defaultHeaders
- case EmptyTraceContext ⇒ defaultHeaders
- }
+ } getOrElse (defaultHeaders)
- pjp.proceed(Array(modifiedHeaders))
+ pjp.proceed(Array[AnyRef](request, modifiedHeaders))
}
}
diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
index 74d98564..eb25412b 100644
--- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
@@ -39,7 +39,7 @@ class ServerRequestInstrumentation {
val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system
val sprayExtension = Kamon(Spray)(system)
- val defaultTraceName: String = request.method.value + ": " + request.uri.path
+ val defaultTraceName = sprayExtension.generateTraceName(request)
val token = if (sprayExtension.includeTraceToken) {
request.headers.find(_.name == sprayExtension.traceTokenHeaderName).map(_.value)
} else None
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
index fbf69c8a..57f9ebe1 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -18,22 +18,23 @@ package kamon.spray
import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.ActorSystem
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.{ Matchers, WordSpecLike }
import spray.httpx.RequestBuilding
import spray.http.{ HttpResponse, HttpRequest }
-import kamon.trace.{ SegmentMetricIdentity, TraceRecorder }
+import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder }
import com.typesafe.config.ConfigFactory
import spray.can.Http
import spray.http.HttpHeaders.RawHeader
import kamon.Kamon
import kamon.metric.{ TraceMetrics, Metrics }
-import spray.client.pipelining
+import spray.client.pipelining.sendReceive
import kamon.metric.Subscriptions.TickMetricSnapshot
import scala.concurrent.duration._
-import akka.pattern.pipe
import kamon.metric.TraceMetrics.TraceMetricsSnapshot
-class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer {
+class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with ScalaFutures with RequestBuilding with TestServer {
implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString(
"""
|akka {
@@ -41,8 +42,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
|}
|
|kamon {
+ | spray {
+ | name-generator = kamon.spray.TestSprayNameGenerator
+ | }
+ |
| metrics {
- | tick-interval = 2 seconds
+ | tick-interval = 1 hour
|
| filters = [
| {
@@ -57,19 +62,21 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
""".stripMargin))
implicit def ec = system.dispatcher
+ implicit val defaultPatience = PatienceConfig(timeout = Span(10, Seconds), interval = Span(5, Millis))
- "the client instrumentation" when {
- "configured to do automatic-trace-token-propagation" should {
- "include the trace token header on spray-client requests" in {
+ "the spray client instrumentation" when {
+ "using the request-level api" should {
+ "include the trace token header if automatic-trace-token-propagation is enabled" in {
enableAutomaticTraceTokenPropagation()
-
- val (hostConnector, server) = buildSHostConnectorAndServer
- val client = TestProbe()
+ val (_, server, bound) = buildSHostConnectorAndServer
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") {
- client.send(hostConnector, Get("/dummy-path"))
- TraceRecorder.currentContext
+ val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("include-trace-token-header-at-request-level-api") {
+ val rF = sendReceive(system, ec) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path")
+ }
+
+ (TraceRecorder.currentContext, rF)
}
// Accept the connection at the server side
@@ -82,20 +89,105 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
- client.expectMsgType[HttpResponse]
+ responseFuture.futureValue.entity.asString should be("ok")
testContext.finish()
}
- }
- "not configured to do automatic-trace-token-propagation" should {
- "not include the trace token header on spray-client requests" in {
+ "not include the trace token header if automatic-trace-token-propagation is disabled" in {
disableAutomaticTraceTokenPropagation()
+ val (_, server, bound) = buildSHostConnectorAndServer
+
+ // Initiate a request within the context of a trace
+ val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("do-not-include-trace-token-header-at-request-level-api") {
+ val rF = sendReceive(system, ec) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/dummy-path")
+ }
+
+ (TraceRecorder.currentContext, rF)
+ }
+
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+
+ // Finish the request cycle, just to avoid error messages on the logs.
+ server.reply(HttpResponse(entity = "ok"))
+ responseFuture.futureValue.entity.asString should be("ok")
+ testContext.finish()
+ }
+
+ "start and finish a segment that must be named using the request level api name assignation" in {
+ enableAutomaticTraceTokenPropagation()
+ enablePipeliningSegmentCollectionStrategy()
+
+ val transport = TestProbe()
+ val (_, _, bound) = buildSHostConnectorAndServer
+
+ // Initiate a request within the context of a trace
+ val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("assign-name-to-segment-with-request-level-api") {
+ val rF = sendReceive(transport.ref)(ec, 10.seconds) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment")
+ }
+
+ (TraceRecorder.currentContext, rF)
+ }
+
+ // Receive the request and reply back
+ transport.expectMsgType[HttpRequest]
+ transport.reply(HttpResponse(entity = "ok"))
+ responseFuture.futureValue.entity.asString should be("ok")
+ testContext.finish()
+
+ val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api")
+ traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1)
+ traceMetricsSnapshot.segments(SegmentMetricIdentity("request-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1)
+ }
+
+ "rename a request level api segment once it reaches the relevant host connector" in {
+ enableAutomaticTraceTokenPropagation()
+ enablePipeliningSegmentCollectionStrategy()
+
+ val (_, server, bound) = buildSHostConnectorAndServer
+
+ // Initiate a request within the context of a trace
+ val (testContext, responseFuture) = TraceRecorder.withNewTraceContext("rename-segment-with-request-level-api") {
+ val rF = sendReceive(system, ec) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment")
+ }
+
+ (TraceRecorder.currentContext, rF)
+ }
- val (hostConnector, server) = buildSHostConnectorAndServer
+ // Accept the connection at the server side
+ server.expectMsgType[Http.Connected]
+ server.reply(Http.Register(server.ref))
+
+ // Receive the request and reply back
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ responseFuture.futureValue.entity.asString should be("ok")
+ testContext.finish()
+
+ val traceMetricsSnapshot = takeSnapshotOf("rename-segment-with-request-level-api")
+ traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1)
+ traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /request-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1)
+ }
+ }
+
+ "using the host-level api" should {
+ "include the trace token header on spray-client requests if automatic-trace-token-propagation is enabled" in {
+ enableAutomaticTraceTokenPropagation()
+ enableInternalSegmentCollectionStrategy()
+
+ val (hostConnector, server, _) = buildSHostConnectorAndServer
val client = TestProbe()
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") {
+ val testContext = TraceRecorder.withNewTraceContext("include-trace-token-header-on-http-client-request") {
client.send(hostConnector, Get("/dummy-path"))
TraceRecorder.currentContext
}
@@ -106,30 +198,24 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+ request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
testContext.finish()
}
- }
- "configured to use pipelining segment collection strategy" should {
- "open a segment when sendReceive is called and close it when the resulting Future[HttpResponse] is completed" in {
- enablePipeliningSegmentCollectionStrategy()
+ "not include the trace token header on spray-client requests if automatic-trace-token-propagation is disabled" in {
+ disableAutomaticTraceTokenPropagation()
+ enableInternalSegmentCollectionStrategy()
- val (hostConnector, server) = buildSHostConnectorAndServer
+ val (hostConnector, server, _) = buildSHostConnectorAndServer
val client = TestProbe()
- val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds)
-
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
- metricListener.expectMsgType[TickMetricSnapshot]
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("pipelining-strategy-client-request") {
- pipeline(Get("/dummy-path")) to client.ref
+ val testContext = TraceRecorder.withNewTraceContext("not-include-trace-token-header-on-http-client-request") {
+ client.send(hostConnector, Get("/dummy-path"))
TraceRecorder.currentContext
}
@@ -138,39 +224,25 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
server.reply(Http.Register(server.ref))
// Receive the request and reply back
- val req = server.expectMsgType[HttpRequest]
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+
+ // Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
-
- // Finish the trace
testContext.finish()
-
- val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds)
- traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
- traceMetrics.segments should not be empty
- val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2)
- recordedSegment should not be empty
- recordedSegment map { segmentMetrics ⇒
- segmentMetrics.numberOfMeasurements should be(1L)
- }
}
- }
- "configured to use internal segment collection strategy" should {
- "open a segment upon reception of a request by the HttpHostConnector and close it when sending the response" in {
+ "start and finish a segment that must be named using the host level api name assignation" in {
+ disableAutomaticTraceTokenPropagation()
enableInternalSegmentCollectionStrategy()
- val (hostConnector, server) = buildSHostConnectorAndServer
+ val (hostConnector, server, _) = buildSHostConnectorAndServer
val client = TestProbe()
- val pipeline = pipelining.sendReceive(hostConnector)(system.dispatcher, 3 seconds)
-
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
- metricListener.expectMsgType[TickMetricSnapshot]
// Initiate a request within the context of a trace
- val testContext = TraceRecorder.withNewTraceContext("internal-strategy-client-request") {
- pipeline(Get("/dummy-path")) to client.ref
+ val testContext = TraceRecorder.withNewTraceContext("create-segment-with-host-level-api") {
+ client.send(hostConnector, Get("/host-level-api-segment"))
TraceRecorder.currentContext
}
@@ -179,21 +251,17 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
server.reply(Http.Register(server.ref))
// Receive the request and reply back
- server.expectMsgType[HttpRequest]
+ val request = server.expectMsgType[HttpRequest]
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
+
+ // Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
-
- // Finish the trace
testContext.finish()
- val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds)
- traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
- traceMetrics.segments should not be empty
- val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2)
- recordedSegment should not be empty
- recordedSegment map { segmentMetrics ⇒
- segmentMetrics.numberOfMeasurements should be(1L)
- }
+ val traceMetricsSnapshot = takeSnapshotOf("create-segment-with-host-level-api")
+ traceMetricsSnapshot.elapsedTime.numberOfMeasurements should be(1)
+ traceMetricsSnapshot.segments(SegmentMetricIdentity("host-level /host-level-api-segment", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1)
}
}
}
@@ -208,6 +276,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
metricsOption.get.asInstanceOf[TraceMetricsSnapshot]
}
+ def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = {
+ val recorder = Kamon(Metrics).register(TraceMetrics(traceName), TraceMetrics.Factory)
+ val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
+ recorder.get.collect(collectionContext)
+ }
+
def enableInternalSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Internal)
def enablePipeliningSegmentCollectionStrategy(): Unit = setSegmentCollectionStrategy(ClientSegmentCollectionStrategy.Pipelining)
def enableAutomaticTraceTokenPropagation(): Unit = setIncludeTraceToken(true)
@@ -227,3 +301,9 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
field.set(target, include)
}
}
+
+class TestSprayNameGenerator extends SprayNameGenerator {
+ def generateTraceName(request: HttpRequest): String = request.uri.path.toString()
+ def generateRequestLevelApiSegmentName(request: HttpRequest): String = "request-level " + request.uri.path.toString()
+ def generateHostLevelApiSegmentName(request: HttpRequest): String = "host-level " + request.uri.path.toString()
+}
diff --git a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala
index 65506770..379b8fc8 100644
--- a/kamon-spray/src/test/scala/kamon/spray/TestServer.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/TestServer.scala
@@ -45,13 +45,13 @@ trait TestServer {
probe.sender
}
- def buildSHostConnectorAndServer: (ActorRef, TestProbe) = {
+ def buildSHostConnectorAndServer: (ActorRef, TestProbe, Http.Bound) = {
val serverHandler = TestProbe()
IO(Http).tell(Http.Bind(listener = serverHandler.ref, interface = "127.0.0.1", port = 0), serverHandler.ref)
val bound = serverHandler.expectMsgType[Bound](10 seconds)
val client = httpHostConnector(bound)
- (client, serverHandler)
+ (client, serverHandler, bound)
}
private def httpHostConnector(connectionInfo: Http.Bound): ActorRef = {