aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala11
-rw-r--r--kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala1
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala13
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala28
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala5
-rw-r--r--kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala21
-rw-r--r--kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala23
-rw-r--r--kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala29
-rw-r--r--kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala2
-rw-r--r--kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala6
-rw-r--r--kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala7
-rw-r--r--kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala104
-rw-r--r--kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala47
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala20
-rw-r--r--kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala4
16 files changed, 164 insertions, 169 deletions
diff --git a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala b/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala
index 3278ec67..560008cf 100644
--- a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala
+++ b/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala
@@ -31,12 +31,15 @@ class RemotingInstrumentation {
envelopeBuilder.setMessage(serializedMessage)
// Attach the TraceContext info, if available.
- TraceRecorder.currentContext.foreach { context ⇒
+ if (!TraceRecorder.currentContext.isEmpty) {
+ val context = TraceRecorder.currentContext
+ val relativeStartMilliTime = System.currentTimeMillis - ((System.nanoTime - context.nanoTimestamp) / 1000000)
+
envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder()
.setTraceName(context.name)
.setTraceToken(context.token)
.setIsOpen(context.isOpen)
- .setStartMilliTime(context.startMilliTime)
+ .setStartMilliTime(relativeStartMilliTime)
.build())
}
@@ -81,14 +84,14 @@ class RemotingInstrumentation {
if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) {
val remoteTraceContext = ackAndEnvelope.getEnvelope.getTraceContext
val system = provider.guardian.underlying.system
- val tc = TraceRecorder.joinRemoteTraceContext(
+ val ctx = TraceRecorder.joinRemoteTraceContext(
remoteTraceContext.getTraceName(),
remoteTraceContext.getTraceToken(),
remoteTraceContext.getStartMilliTime(),
remoteTraceContext.getIsOpen(),
system)
- TraceRecorder.setContext(Some(tc))
+ TraceRecorder.setContext(ctx)
}
pjp.proceed()
diff --git a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala
index 29276dd0..8a3973ca 100644
--- a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala
+++ b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala
@@ -127,14 +127,13 @@ class TraceTokenReplier(creationTraceContextListener: Option[ActorRef]) extends
case "fail" ⇒
throw new ArithmeticException("Division by zero.")
case "reply-trace-token" ⇒
- log.info("Sending back the TT: " + TraceRecorder.currentContext.map(_.token).getOrElse("unavailable"))
+ log.info("Sending back the TT: " + TraceRecorder.currentContext.token)
sender ! currentTraceContextInfo
}
def currentTraceContextInfo: String = {
- TraceRecorder.currentContext.map { context ⇒
- s"name=${context.name}|token=${context.token}|isOpen=${context.isOpen}"
- }.getOrElse("unavailable")
+ val ctx = TraceRecorder.currentContext
+ s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}"
}
}
@@ -162,8 +161,7 @@ class SupervisorOfRemote(traceContextListener: ActorRef, remoteAddress: Address)
}
def currentTraceContextInfo: String = {
- TraceRecorder.currentContext.map { context ⇒
- s"name=${context.name}|token=${context.token}|isOpen=${context.isOpen}"
- }.getOrElse("unavailable")
+ val ctx = TraceRecorder.currentContext
+ s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}"
}
}
diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
index 54626b6c..7246ccb5 100644
--- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
@@ -30,7 +30,6 @@ object TraceMetrics extends MetricGroupCategory {
val name = "trace"
case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" }
- case class HttpClientRequest(name: String) extends MetricIdentity
case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram)
extends MetricGroupRecorder {
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index a5855308..08289acf 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -35,7 +35,9 @@ sealed trait TraceContext {
def origin: TraceContextOrigin
def isOpen: Boolean
def isEmpty: Boolean
+ def nonEmpty: Boolean = !isEmpty
def startSegment(segmentName: String, label: String): Segment
+ def nanoTimestamp: Long
}
sealed trait Segment {
@@ -43,6 +45,7 @@ sealed trait Segment {
def rename(newName: String): Unit
def label: String
def finish(): Unit
+ def isEmpty: Boolean
}
case object EmptyTraceContext extends TraceContext {
@@ -54,23 +57,25 @@ case object EmptyTraceContext extends TraceContext {
def isOpen: Boolean = false
def isEmpty: Boolean = true
def startSegment(segmentName: String, label: String): Segment = EmptySegment
+ def nanoTimestamp: Long = 0L
case object EmptySegment extends Segment {
val name: String = "empty-segment"
val label: String = "empty-label"
+ def isEmpty: Boolean = true
def rename(newName: String): Unit = {}
def finish: Unit = {}
}
}
class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail,
- val origin: TraceContextOrigin, startNanoTime: Long)(implicit system: ActorSystem) extends TraceContext {
+ val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext {
val isEmpty: Boolean = false
@volatile private var _name = traceName
@volatile private var _isOpen = izOpen
- private val _startNanoTime = startNanoTime
+ private val _nanoTimestamp = nanoTimeztamp
private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
private val metricsExtension = Kamon(Metrics)(system)
private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
@@ -80,10 +85,11 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean,
if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace.
def isOpen: Boolean = _isOpen
+ def nanoTimestamp: Long = _nanoTimestamp
def finish(): Unit = {
_isOpen = false
- val elapsedNanoTime = System.nanoTime() - _startNanoTime
+ val elapsedNanoTime = System.nanoTime() - _nanoTimestamp
val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
metricRecorder.map { traceMetrics ⇒
@@ -119,6 +125,7 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean,
def name: String = _segmentName
def rename(newName: String): Unit = _segmentName = newName
+ def isEmpty: Boolean = false
def finish: Unit = {
val segmentFinishNanoTime = System.nanoTime()
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
index 9b0ba038..8da187cb 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
@@ -36,23 +36,25 @@ object TraceRecorder {
private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = {
new DefaultTraceContext(
- name, token.getOrElse(newToken),
+ name,
+ token.getOrElse(newToken),
izOpen = true,
LevelOfDetail.OnlyMetrics,
TraceContextOrigin.Local,
- startNanoTime = System.nanoTime)(system)
+ nanoTimeztamp = System.nanoTime,
+ system)
}
def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = {
- /*new SimpleMetricCollectionContext(
+ val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000)
+ new DefaultTraceContext(
traceName,
traceToken,
- Map.empty,
+ isOpen,
+ LevelOfDetail.OnlyMetrics,
TraceContextOrigin.Remote,
- system,
- startMilliTime,
- isOpen)*/
- ???
+ equivalentNanotime,
+ system)
}
def setContext(context: TraceContext): Unit = traceContextStorage.set(context)
@@ -61,10 +63,9 @@ object TraceRecorder {
def currentContext: TraceContext = traceContextStorage.get()
- // TODO: Remove this method.
def start(name: String, token: Option[String] = None)(implicit system: ActorSystem) = {
- //val ctx = newTraceContext(name, token, metadata, system)
- //traceContextStorage.set(Some(ctx))
+ val ctx = newTraceContext(name, token, system)
+ traceContextStorage.set(ctx)
}
def rename(name: String): Unit = currentContext.rename(name)
@@ -79,6 +80,11 @@ object TraceRecorder {
try thunk finally setContext(oldContext)
}
+ def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match {
+ case ctx: DefaultTraceContext ⇒ Some(thunk(ctx, ctx.system))
+ case EmptyTraceContext ⇒ None
+ }
+
def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext]
def finish(): Unit = currentContext.finish()
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
index 4bb0ad3a..08b5df99 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
@@ -34,10 +34,7 @@ class NewRelicErrorLogger extends Actor with ActorLogging {
val params = new java.util.HashMap[String, String]()
val ctx = error.asInstanceOf[TraceContextAware].traceContext
-
- for (c ← ctx) {
- params.put("TraceToken", c.token)
- }
+ params.put("TraceToken", ctx.token)
if (error.cause == Error.NoCause) {
NR.noticeError(error.message.toString, params)
diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala
index 92686ff0..e2ffd3f9 100644
--- a/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala
+++ b/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala
@@ -15,7 +15,7 @@
package kamon.play.instrumentation
-import kamon.trace.{ TraceContext, TraceContextAware, TraceRecorder }
+import kamon.trace._
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation._
import org.slf4j.MDC
@@ -52,21 +52,24 @@ class LoggerLikeInstrumentation {
object LoggerLikeInstrumentation {
@inline final def withMDC[A](block: ⇒ A): A = {
- val keys = TraceRecorder.currentContext.map(extractProperties).map(putAndExtractKeys)
+ val keys = putAndExtractKeys(extractProperties(TraceRecorder.currentContext))
- try block finally keys.map(k ⇒ k.foreach(MDC.remove(_)))
+ try block finally keys.foreach(k ⇒ MDC.remove(k))
}
def putAndExtractKeys(values: Iterable[Map[String, Any]]): Iterable[String] = values.map {
value ⇒ value.map { case (key, value) ⇒ MDC.put(key, value.toString); key }
}.flatten
- def extractProperties(ctx: TraceContext): Iterable[Map[String, Any]] = ctx.traceLocalStorage.underlyingStorage.values.map {
- case traceLocalValue @ (p: Product) ⇒ {
- val properties = p.productIterator
- traceLocalValue.getClass.getDeclaredFields.filter(field ⇒ field.getName != "$outer").map(_.getName -> properties.next).toMap
- }
- case anything ⇒ Map.empty[String, Any]
+ def extractProperties(traceContext: TraceContext): Iterable[Map[String, Any]] = traceContext match {
+ case ctx: DefaultTraceContext ⇒
+ ctx.traceLocalStorage.underlyingStorage.values.collect {
+ case traceLocalValue @ (p: Product) ⇒ {
+ val properties = p.productIterator
+ traceLocalValue.getClass.getDeclaredFields.filter(field ⇒ field.getName != "$outer").map(_.getName -> properties.next).toMap
+ }
+ }
+ case EmptyTraceContext ⇒ Iterable.empty[Map[String, Any]]
}
}
diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
index c761e72f..82a43926 100644
--- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
+++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
@@ -54,16 +54,23 @@ class RequestInstrumentation {
@Around("execution(* play.api.GlobalSettings+.doFilter(*)) && args(next)")
def aroundDoFilter(pjp: ProceedingJoinPoint, next: EssentialAction): Any = {
val essentialAction = (requestHeader: RequestHeader) ⇒ {
+ // TODO: Move to a Kamon-specific dispatcher.
val executor = Kamon(Play)(Akka.system()).defaultDispatcher
def onResult(result: Result): Result = {
- TraceRecorder.finish()
- TraceRecorder.currentContext.map { ctx ⇒
- val playExtension = Kamon(Play)(ctx.system)
+
+ TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ ctx.finish()
+
+ val playExtension = Kamon(Play)(system)
recordHttpServerMetrics(result.header, ctx.name, playExtension)
- if (playExtension.includeTraceToken) result.withHeaders(playExtension.traceTokenHeaderName -> ctx.token)
- else result
- }.getOrElse(result)
+
+ if (playExtension.includeTraceToken)
+ result.withHeaders(playExtension.traceTokenHeaderName -> ctx.token)
+ else
+ result
+
+ } getOrElse (result)
}
//override the current trace name
@@ -76,8 +83,8 @@ class RequestInstrumentation {
}
@Before("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)")
- def beforeOnError(request: TraceContextAware, ex: Throwable): Unit = request.traceContext.map {
- ctx ⇒ recordHttpServerMetrics(InternalServerError.header, ctx.name, Kamon(Play)(ctx.system))
+ def beforeOnError(request: TraceContextAware, ex: Throwable): Unit = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ recordHttpServerMetrics(InternalServerError.header, ctx.name, Kamon(Play)(system))
}
private def recordHttpServerMetrics(header: ResponseHeader, traceName: String, playExtension: PlayExtension): Unit =
diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
index 87467050..c2eafa2b 100644
--- a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
+++ b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
@@ -17,12 +17,10 @@
package kamon.play.instrumentation
import kamon.Kamon
-import kamon.metric.TraceMetrics.HttpClientRequest
import kamon.play.Play
-import kamon.trace.TraceRecorder
+import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder }
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut }
-import play.api.libs.ws.ning.NingWSRequest
import play.api.libs.ws.{ WSRequest, WSResponse }
import scala.concurrent.Future
@@ -35,28 +33,13 @@ class WSInstrumentation {
@Around("onExecuteRequest(request)")
def aroundExecuteRequest(pjp: ProceedingJoinPoint, request: WSRequest): Any = {
-
- import kamon.play.instrumentation.WSInstrumentation._
-
- TraceRecorder.currentContext.map { ctx ⇒
- val executor = Kamon(Play)(ctx.system).defaultDispatcher
- val segmentHandle = TraceRecorder.startSegment(HttpClientRequest(request.url), basicRequestAttributes(request))
+ TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+ val executor = Kamon(Play)(system).defaultDispatcher
+ val segment = ctx.startSegment(request.url, SegmentMetricIdentityLabel.HttpClient)
val response = pjp.proceed().asInstanceOf[Future[WSResponse]]
- response.map(result ⇒ segmentHandle.map(_.finish()))(executor)
+ response.map(result ⇒ segment.finish())(executor)
response
- }.getOrElse(pjp.proceed())
- }
-}
-
-object WSInstrumentation {
-
- def uri(request: WSRequest): java.net.URI = request.asInstanceOf[NingWSRequest].builder.build().getURI
-
- def basicRequestAttributes(request: WSRequest): Map[String, String] = {
- Map[String, String](
- "host" -> uri(request).getHost,
- "path" -> uri(request).getPath,
- "method" -> request.method)
+ } getOrElse (pjp.proceed())
}
} \ No newline at end of file
diff --git a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
index 2afd31fd..3feb6246 100644
--- a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
+++ b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
@@ -117,7 +117,7 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite {
"respond to the Async Action with X-Trace-Token and the renamed trace" in {
val result = Await.result(route(FakeRequest(GET, "/async-renamed").withHeaders(traceTokenHeader)).get, 10 seconds)
- TraceRecorder.currentContext.map(_.name) must be(Some("renamed-trace"))
+ TraceRecorder.currentContext.name must be("renamed-trace")
Some(result.header.headers(traceTokenHeaderName)) must be(expectedToken)
}
diff --git a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
index b72659d2..bf1ead05 100644
--- a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
+++ b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
@@ -17,9 +17,9 @@
package kamon.play
import kamon.Kamon
-import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot }
+import kamon.metric.TraceMetrics.TraceMetricsSnapshot
import kamon.metric.{ Metrics, TraceMetrics }
-import kamon.trace.TraceRecorder
+import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder }
import org.scalatest.{ Matchers, WordSpecLike }
import org.scalatestplus.play.OneServerPerSuite
import play.api.libs.ws.WS
@@ -49,7 +49,7 @@ class WSInstrumentationSpec extends WordSpecLike with Matchers with OneServerPer
val snapshot = takeSnapshotOf("GET: /inside")
snapshot.elapsedTime.numberOfMeasurements should be(1)
snapshot.segments.size should be(1)
- snapshot.segments(HttpClientRequest("http://localhost:19001/async")).numberOfMeasurements should be(1)
+ snapshot.segments(SegmentMetricIdentity("http://localhost:19001/async", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1)
}
"propagate the TraceContext outside an Action and complete the WS request" in {
diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
index 3e6e982e..878c3c8c 100644
--- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
+++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
@@ -21,10 +21,9 @@ import akka.routing.RoundRobinPool
import akka.util.Timeout
import kamon.Kamon
import kamon.metric.Subscriptions.TickMetricSnapshot
-import kamon.metric.TraceMetrics.HttpClientRequest
import kamon.metric._
import kamon.spray.KamonTraceDirectives
-import kamon.trace.TraceRecorder
+import kamon.trace.{ SegmentMetricIdentityLabel, TraceRecorder }
import spray.http.{ StatusCodes, Uri }
import spray.httpx.RequestBuilding
import spray.routing.SimpleRoutingApp
@@ -128,9 +127,9 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
} ~
path("segment") {
complete {
- val segment = TraceRecorder.startSegment(HttpClientRequest("hello-world"))
+ val segment = TraceRecorder.currentContext.startSegment("hello-world", SegmentMetricIdentityLabel.HttpClient)
(replier ? "hello").mapTo[String].onComplete { t ⇒
- segment.get.finish()
+ segment.finish()
}
"segment"
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
index d9cdde08..cfd204df 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala
@@ -19,9 +19,8 @@ package spray.can.client
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest }
-import spray.http.HttpHeaders.{ RawHeader, Host }
-import kamon.trace.{SegmentAware, TraceRecorder, SegmentCompletionHandleAware}
-import kamon.metric.TraceMetrics.HttpClientRequest
+import spray.http.HttpHeaders.RawHeader
+import kamon.trace._
import kamon.Kamon
import kamon.spray.{ ClientSegmentCollectionStrategy, Spray }
import akka.actor.ActorRef
@@ -34,26 +33,28 @@ class ClientRequestInstrumentation {
@DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
def mixin: SegmentAware = SegmentAware.default
- @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)")
- def requestContextCreation(ctx: SegmentAware, request: HttpRequest): Unit = {}
+ @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)")
+ def requestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {}
- @After("requestContextCreation(ctx, request)")
- def afterRequestContextCreation(ctx: SegmentAware, request: HttpRequest): Unit = {
+ @After("requestContextCreation(requestContext, request)")
+ def afterRequestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {
// The RequestContext will be copied when a request needs to be retried but we are only interested in creating the
- // completion handle the first time we create one.
+ // segment the first time we create one.
// The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely.
- if (ctx.segmentCompletionHandle.isEmpty) {
- TraceRecorder.currentContext.map { traceContext ⇒
- val sprayExtension = Kamon(Spray)(traceContext.system)
+ if (requestContext.segment.isEmpty) {
+ TraceRecorder.currentContext match {
+ case ctx: DefaultTraceContext ⇒
+ val sprayExtension = Kamon(Spray)(ctx.system)
- if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
- val requestAttributes = basicRequestAttributes(request)
- val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes)
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) {
+ val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
+ val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient)
- ctx.segmentCompletionHandle = Some(completionHandle)
- }
+ requestContext.segment = segment
+ }
+
+ case EmptyTraceContext ⇒ // Nothing to do here.
}
}
}
@@ -73,17 +74,15 @@ class ClientRequestInstrumentation {
@Around("dispatchToCommander(requestContext, message)")
def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentAware, message: Any) = {
- requestContext.traceContext match {
- case ctx @ Some(_) ⇒
- TraceRecorder.withInlineTraceContextReplacement(ctx) {
- if (message.isInstanceOf[HttpMessageEnd])
- requestContext.segment.finish()
+ if (requestContext.traceContext.nonEmpty) {
+ TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) {
+ if (message.isInstanceOf[HttpMessageEnd])
+ requestContext.segment.finish()
- pjp.proceed()
- }
+ pjp.proceed()
+ }
- case None ⇒ pjp.proceed()
- }
+ } else pjp.proceed()
}
@Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)")
@@ -95,18 +94,21 @@ class ClientRequestInstrumentation {
(request: HttpRequest) ⇒ {
val responseFuture = originalSendReceive.apply(request)
- TraceRecorder.currentContext.map { traceContext ⇒
- val sprayExtension = Kamon(Spray)(traceContext.system)
-
- if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) {
- val requestAttributes = basicRequestAttributes(request)
- val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
- val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes)
-
- responseFuture.onComplete { result ⇒
- completionHandle.finish(Map.empty)
- }(ec)
- }
+
+ TraceRecorder.currentContext match {
+ case ctx: DefaultTraceContext ⇒
+ val sprayExtension = Kamon(Spray)(ctx.system)
+
+ if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) {
+ val clientRequestName = sprayExtension.assignHttpClientRequestName(request)
+ val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient)
+
+ responseFuture.onComplete { result ⇒
+ segment.finish()
+ }(ec)
+ }
+
+ case EmptyTraceContext ⇒ // Nothing to do here.
}
responseFuture
@@ -114,26 +116,22 @@ class ClientRequestInstrumentation {
}
- def basicRequestAttributes(request: HttpRequest): Map[String, String] = {
- Map[String, String](
- "host" -> request.header[Host].map(_.value).getOrElse("unknown"),
- "path" -> request.uri.path.toString(),
- "method" -> request.method.toString())
- }
-
@Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)")
def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {}
@Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)")
def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = {
- val modifiedHeaders = TraceRecorder.currentContext map { traceContext ⇒
- val sprayExtension = Kamon(Spray)(traceContext.system)
-
- if (sprayExtension.includeTraceToken)
- RawHeader(sprayExtension.traceTokenHeaderName, traceContext.token) :: defaultHeaders
- else
- defaultHeaders
- } getOrElse defaultHeaders
+ val modifiedHeaders = TraceRecorder.currentContext match {
+ case ctx: DefaultTraceContext ⇒
+ val sprayExtension = Kamon(Spray)(ctx.system)
+
+ if (sprayExtension.includeTraceToken)
+ RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders
+ else
+ defaultHeaders
+
+ case EmptyTraceContext ⇒ defaultHeaders
+ }
pjp.proceed(Array(modifiedHeaders))
}
diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
index 69b0160e..74d98564 100644
--- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
@@ -16,11 +16,10 @@
package spray.can.server
import org.aspectj.lang.annotation._
-import kamon.trace.{ TraceContext, TraceRecorder, TraceContextAware }
+import kamon.trace._
import akka.actor.ActorSystem
import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest }
import akka.event.Logging.Warning
-import scala.Some
import kamon.Kamon
import kamon.spray.{ SprayExtension, Spray }
import org.aspectj.lang.ProceedingJoinPoint
@@ -67,40 +66,36 @@ class ServerRequestInstrumentation {
val incomingContext = TraceRecorder.currentContext
val storedContext = openRequest.traceContext
- verifyTraceContextConsistency(incomingContext, storedContext)
- incomingContext match {
- case None ⇒ pjp.proceed()
- case Some(traceContext) ⇒
- val sprayExtension = Kamon(Spray)(traceContext.system)
+ // The stored context is always a DefaultTraceContext if the instrumentation is running
+ val system = storedContext.asInstanceOf[DefaultTraceContext].system
- val proceedResult = if (sprayExtension.includeTraceToken) {
- val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, traceContext.token)
- pjp.proceed(Array(openRequest, responseWithHeader))
+ verifyTraceContextConsistency(incomingContext, storedContext, system)
- } else pjp.proceed
+ if (incomingContext.isEmpty)
+ pjp.proceed()
+ else {
+ val sprayExtension = Kamon(Spray)(system)
- TraceRecorder.finish()
- recordHttpServerMetrics(response, traceContext.name, sprayExtension)
- proceedResult
- }
- }
+ val proceedResult = if (sprayExtension.includeTraceToken) {
+ val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, incomingContext.token)
+ pjp.proceed(Array(openRequest, responseWithHeader))
- def verifyTraceContextConsistency(incomingTraceContext: Option[TraceContext], storedTraceContext: Option[TraceContext]): Unit = {
- for (original ← storedTraceContext) {
- incomingTraceContext match {
- case Some(incoming) if original.token != incoming.token ⇒
- publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]", incoming.system)
+ } else pjp.proceed
- case Some(_) ⇒ // nothing to do here.
-
- case None ⇒
- publishWarning(s"Trace context not present while closing the Trace: [$original]", original.system)
- }
+ TraceRecorder.finish()
+ recordHttpServerMetrics(response, incomingContext.name, sprayExtension)
+ proceedResult
}
+ }
+ def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext, system: ActorSystem): Unit = {
def publishWarning(text: String, system: ActorSystem): Unit =
system.eventStream.publish(Warning("", classOf[ServerRequestInstrumentation], text))
+ if (incomingTraceContext.nonEmpty && incomingTraceContext.token != storedTraceContext.token)
+ publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]", system)
+ else
+ publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]", system)
}
def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit =
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
index 54329645..fbf69c8a 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -21,7 +21,7 @@ import akka.actor.ActorSystem
import org.scalatest.{ Matchers, WordSpecLike }
import spray.httpx.RequestBuilding
import spray.http.{ HttpResponse, HttpRequest }
-import kamon.trace.TraceRecorder
+import kamon.trace.{ SegmentMetricIdentity, TraceRecorder }
import com.typesafe.config.ConfigFactory
import spray.can.Http
import spray.http.HttpHeaders.RawHeader
@@ -31,7 +31,7 @@ import spray.client.pipelining
import kamon.metric.Subscriptions.TickMetricSnapshot
import scala.concurrent.duration._
import akka.pattern.pipe
-import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot }
+import kamon.metric.TraceMetrics.TraceMetricsSnapshot
class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer {
implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString(
@@ -78,12 +78,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
+ request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
- testContext.map(_.finish(Map.empty))
+ testContext.finish()
}
}
@@ -106,12 +106,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
// Receive the request and reply back
val request = server.expectMsgType[HttpRequest]
- request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token))
+ request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token))
// Finish the request cycle, just to avoid error messages on the logs.
server.reply(HttpResponse(entity = "ok"))
client.expectMsgType[HttpResponse]
- testContext.map(_.finish(Map.empty))
+ testContext.finish()
}
}
@@ -143,12 +143,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
client.expectMsgType[HttpResponse]
// Finish the trace
- testContext.map(_.finish(Map.empty))
+ testContext.finish()
val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds)
traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
traceMetrics.segments should not be empty
- val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2)
+ val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2)
recordedSegment should not be empty
recordedSegment map { segmentMetrics ⇒
segmentMetrics.numberOfMeasurements should be(1L)
@@ -184,12 +184,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit
client.expectMsgType[HttpResponse]
// Finish the trace
- testContext.map(_.finish(Map.empty))
+ testContext.finish()
val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds)
traceMetrics.elapsedTime.numberOfMeasurements should be(1L)
traceMetrics.segments should not be empty
- val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2)
+ val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2)
recordedSegment should not be empty
recordedSegment map { segmentMetrics ⇒
segmentMetrics.numberOfMeasurements should be(1L)
diff --git a/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala b/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala
index de867035..825cc718 100644
--- a/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala
+++ b/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala
@@ -17,7 +17,7 @@
package akka.testkit
import org.aspectj.lang.annotation._
-import kamon.trace.{ TraceContextAware, TraceRecorder }
+import kamon.trace.{ EmptyTraceContext, TraceContextAware, TraceRecorder }
import org.aspectj.lang.ProceedingJoinPoint
import akka.testkit.TestActor.RealMessage
@@ -43,7 +43,7 @@ class TestProbeInstrumentation {
def aroundTestProbeReply(pjp: ProceedingJoinPoint, testProbe: TestProbe): Any = {
val traceContext = testProbe.lastMessage match {
case msg: RealMessage ⇒ msg.asInstanceOf[TraceContextAware].traceContext
- case _ ⇒ None
+ case _ ⇒ EmptyTraceContext
}
TraceRecorder.withTraceContext(traceContext) {