diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-10-26 02:21:11 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-10-26 02:21:11 +0200 |
commit | cd8dce169b4231bf533445656bfb5a35034a6304 (patch) | |
tree | 4561bf3b8b7890891990a0d1b500155975d54277 | |
parent | ea1a0d5d76988992227eb30b0baaf8e97678c946 (diff) | |
download | Kamon-cd8dce169b4231bf533445656bfb5a35034a6304.tar.gz Kamon-cd8dce169b4231bf533445656bfb5a35034a6304.tar.bz2 Kamon-cd8dce169b4231bf533445656bfb5a35034a6304.zip |
= all: upgrade to be compatible with the latest code in core
16 files changed, 164 insertions, 169 deletions
diff --git a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala b/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala index 3278ec67..560008cf 100644 --- a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala +++ b/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala @@ -31,12 +31,15 @@ class RemotingInstrumentation { envelopeBuilder.setMessage(serializedMessage) // Attach the TraceContext info, if available. - TraceRecorder.currentContext.foreach { context ⇒ + if (!TraceRecorder.currentContext.isEmpty) { + val context = TraceRecorder.currentContext + val relativeStartMilliTime = System.currentTimeMillis - ((System.nanoTime - context.nanoTimestamp) / 1000000) + envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder() .setTraceName(context.name) .setTraceToken(context.token) .setIsOpen(context.isOpen) - .setStartMilliTime(context.startMilliTime) + .setStartMilliTime(relativeStartMilliTime) .build()) } @@ -81,14 +84,14 @@ class RemotingInstrumentation { if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) { val remoteTraceContext = ackAndEnvelope.getEnvelope.getTraceContext val system = provider.guardian.underlying.system - val tc = TraceRecorder.joinRemoteTraceContext( + val ctx = TraceRecorder.joinRemoteTraceContext( remoteTraceContext.getTraceName(), remoteTraceContext.getTraceToken(), remoteTraceContext.getStartMilliTime(), remoteTraceContext.getIsOpen(), system) - TraceRecorder.setContext(Some(tc)) + TraceRecorder.setContext(ctx) } pjp.proceed() diff --git a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala index 29276dd0..8a3973ca 100644 --- a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala +++ b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala @@ -127,14 +127,13 @@ class TraceTokenReplier(creationTraceContextListener: Option[ActorRef]) extends case "fail" ⇒ throw new ArithmeticException("Division by zero.") case "reply-trace-token" ⇒ - log.info("Sending back the TT: " + TraceRecorder.currentContext.map(_.token).getOrElse("unavailable")) + log.info("Sending back the TT: " + TraceRecorder.currentContext.token) sender ! currentTraceContextInfo } def currentTraceContextInfo: String = { - TraceRecorder.currentContext.map { context ⇒ - s"name=${context.name}|token=${context.token}|isOpen=${context.isOpen}" - }.getOrElse("unavailable") + val ctx = TraceRecorder.currentContext + s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}" } } @@ -162,8 +161,7 @@ class SupervisorOfRemote(traceContextListener: ActorRef, remoteAddress: Address) } def currentTraceContextInfo: String = { - TraceRecorder.currentContext.map { context ⇒ - s"name=${context.name}|token=${context.token}|isOpen=${context.isOpen}" - }.getOrElse("unavailable") + val ctx = TraceRecorder.currentContext + s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}" } } diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala index 54626b6c..7246ccb5 100644 --- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -30,7 +30,6 @@ object TraceMetrics extends MetricGroupCategory { val name = "trace" case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" } - case class HttpClientRequest(name: String) extends MetricIdentity case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram) extends MetricGroupRecorder { diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index a5855308..08289acf 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -35,7 +35,9 @@ sealed trait TraceContext { def origin: TraceContextOrigin def isOpen: Boolean def isEmpty: Boolean + def nonEmpty: Boolean = !isEmpty def startSegment(segmentName: String, label: String): Segment + def nanoTimestamp: Long } sealed trait Segment { @@ -43,6 +45,7 @@ sealed trait Segment { def rename(newName: String): Unit def label: String def finish(): Unit + def isEmpty: Boolean } case object EmptyTraceContext extends TraceContext { @@ -54,23 +57,25 @@ case object EmptyTraceContext extends TraceContext { def isOpen: Boolean = false def isEmpty: Boolean = true def startSegment(segmentName: String, label: String): Segment = EmptySegment + def nanoTimestamp: Long = 0L case object EmptySegment extends Segment { val name: String = "empty-segment" val label: String = "empty-label" + def isEmpty: Boolean = true def rename(newName: String): Unit = {} def finish: Unit = {} } } class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, - val origin: TraceContextOrigin, startNanoTime: Long)(implicit system: ActorSystem) extends TraceContext { + val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext { val isEmpty: Boolean = false @volatile private var _name = traceName @volatile private var _isOpen = izOpen - private val _startNanoTime = startNanoTime + private val _nanoTimestamp = nanoTimeztamp private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]() private val metricsExtension = Kamon(Metrics)(system) private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage @@ -80,10 +85,11 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace. def isOpen: Boolean = _isOpen + def nanoTimestamp: Long = _nanoTimestamp def finish(): Unit = { _isOpen = false - val elapsedNanoTime = System.nanoTime() - _startNanoTime + val elapsedNanoTime = System.nanoTime() - _nanoTimestamp val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) metricRecorder.map { traceMetrics ⇒ @@ -119,6 +125,7 @@ class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, def name: String = _segmentName def rename(newName: String): Unit = _segmentName = newName + def isEmpty: Boolean = false def finish: Unit = { val segmentFinishNanoTime = System.nanoTime() diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index 9b0ba038..8da187cb 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -36,23 +36,25 @@ object TraceRecorder { private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = { new DefaultTraceContext( - name, token.getOrElse(newToken), + name, + token.getOrElse(newToken), izOpen = true, LevelOfDetail.OnlyMetrics, TraceContextOrigin.Local, - startNanoTime = System.nanoTime)(system) + nanoTimeztamp = System.nanoTime, + system) } def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = { - /*new SimpleMetricCollectionContext( + val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000) + new DefaultTraceContext( traceName, traceToken, - Map.empty, + isOpen, + LevelOfDetail.OnlyMetrics, TraceContextOrigin.Remote, - system, - startMilliTime, - isOpen)*/ - ??? + equivalentNanotime, + system) } def setContext(context: TraceContext): Unit = traceContextStorage.set(context) @@ -61,10 +63,9 @@ object TraceRecorder { def currentContext: TraceContext = traceContextStorage.get() - // TODO: Remove this method. def start(name: String, token: Option[String] = None)(implicit system: ActorSystem) = { - //val ctx = newTraceContext(name, token, metadata, system) - //traceContextStorage.set(Some(ctx)) + val ctx = newTraceContext(name, token, system) + traceContextStorage.set(ctx) } def rename(name: String): Unit = currentContext.rename(name) @@ -79,6 +80,11 @@ object TraceRecorder { try thunk finally setContext(oldContext) } + def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match { + case ctx: DefaultTraceContext ⇒ Some(thunk(ctx, ctx.system)) + case EmptyTraceContext ⇒ None + } + def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext] def finish(): Unit = currentContext.finish() diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 4bb0ad3a..08b5df99 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -34,10 +34,7 @@ class NewRelicErrorLogger extends Actor with ActorLogging { val params = new java.util.HashMap[String, String]() val ctx = error.asInstanceOf[TraceContextAware].traceContext - - for (c ← ctx) { - params.put("TraceToken", c.token) - } + params.put("TraceToken", ctx.token) if (error.cause == Error.NoCause) { NR.noticeError(error.message.toString, params) diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala index 92686ff0..e2ffd3f9 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/LoggerLikeInstrumentation.scala @@ -15,7 +15,7 @@ package kamon.play.instrumentation -import kamon.trace.{ TraceContext, TraceContextAware, TraceRecorder } +import kamon.trace._ import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ import org.slf4j.MDC @@ -52,21 +52,24 @@ class LoggerLikeInstrumentation { object LoggerLikeInstrumentation { @inline final def withMDC[A](block: ⇒ A): A = { - val keys = TraceRecorder.currentContext.map(extractProperties).map(putAndExtractKeys) + val keys = putAndExtractKeys(extractProperties(TraceRecorder.currentContext)) - try block finally keys.map(k ⇒ k.foreach(MDC.remove(_))) + try block finally keys.foreach(k ⇒ MDC.remove(k)) } def putAndExtractKeys(values: Iterable[Map[String, Any]]): Iterable[String] = values.map { value ⇒ value.map { case (key, value) ⇒ MDC.put(key, value.toString); key } }.flatten - def extractProperties(ctx: TraceContext): Iterable[Map[String, Any]] = ctx.traceLocalStorage.underlyingStorage.values.map { - case traceLocalValue @ (p: Product) ⇒ { - val properties = p.productIterator - traceLocalValue.getClass.getDeclaredFields.filter(field ⇒ field.getName != "$outer").map(_.getName -> properties.next).toMap - } - case anything ⇒ Map.empty[String, Any] + def extractProperties(traceContext: TraceContext): Iterable[Map[String, Any]] = traceContext match { + case ctx: DefaultTraceContext ⇒ + ctx.traceLocalStorage.underlyingStorage.values.collect { + case traceLocalValue @ (p: Product) ⇒ { + val properties = p.productIterator + traceLocalValue.getClass.getDeclaredFields.filter(field ⇒ field.getName != "$outer").map(_.getName -> properties.next).toMap + } + } + case EmptyTraceContext ⇒ Iterable.empty[Map[String, Any]] } } diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala index c761e72f..82a43926 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala @@ -54,16 +54,23 @@ class RequestInstrumentation { @Around("execution(* play.api.GlobalSettings+.doFilter(*)) && args(next)") def aroundDoFilter(pjp: ProceedingJoinPoint, next: EssentialAction): Any = { val essentialAction = (requestHeader: RequestHeader) ⇒ { + // TODO: Move to a Kamon-specific dispatcher. val executor = Kamon(Play)(Akka.system()).defaultDispatcher def onResult(result: Result): Result = { - TraceRecorder.finish() - TraceRecorder.currentContext.map { ctx ⇒ - val playExtension = Kamon(Play)(ctx.system) + + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + ctx.finish() + + val playExtension = Kamon(Play)(system) recordHttpServerMetrics(result.header, ctx.name, playExtension) - if (playExtension.includeTraceToken) result.withHeaders(playExtension.traceTokenHeaderName -> ctx.token) - else result - }.getOrElse(result) + + if (playExtension.includeTraceToken) + result.withHeaders(playExtension.traceTokenHeaderName -> ctx.token) + else + result + + } getOrElse (result) } //override the current trace name @@ -76,8 +83,8 @@ class RequestInstrumentation { } @Before("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)") - def beforeOnError(request: TraceContextAware, ex: Throwable): Unit = request.traceContext.map { - ctx ⇒ recordHttpServerMetrics(InternalServerError.header, ctx.name, Kamon(Play)(ctx.system)) + def beforeOnError(request: TraceContextAware, ex: Throwable): Unit = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + recordHttpServerMetrics(InternalServerError.header, ctx.name, Kamon(Play)(system)) } private def recordHttpServerMetrics(header: ResponseHeader, traceName: String, playExtension: PlayExtension): Unit = diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala index 87467050..c2eafa2b 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala @@ -17,12 +17,10 @@ package kamon.play.instrumentation import kamon.Kamon -import kamon.metric.TraceMetrics.HttpClientRequest import kamon.play.Play -import kamon.trace.TraceRecorder +import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } -import play.api.libs.ws.ning.NingWSRequest import play.api.libs.ws.{ WSRequest, WSResponse } import scala.concurrent.Future @@ -35,28 +33,13 @@ class WSInstrumentation { @Around("onExecuteRequest(request)") def aroundExecuteRequest(pjp: ProceedingJoinPoint, request: WSRequest): Any = { - - import kamon.play.instrumentation.WSInstrumentation._ - - TraceRecorder.currentContext.map { ctx ⇒ - val executor = Kamon(Play)(ctx.system).defaultDispatcher - val segmentHandle = TraceRecorder.startSegment(HttpClientRequest(request.url), basicRequestAttributes(request)) + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + val executor = Kamon(Play)(system).defaultDispatcher + val segment = ctx.startSegment(request.url, SegmentMetricIdentityLabel.HttpClient) val response = pjp.proceed().asInstanceOf[Future[WSResponse]] - response.map(result ⇒ segmentHandle.map(_.finish()))(executor) + response.map(result ⇒ segment.finish())(executor) response - }.getOrElse(pjp.proceed()) - } -} - -object WSInstrumentation { - - def uri(request: WSRequest): java.net.URI = request.asInstanceOf[NingWSRequest].builder.build().getURI - - def basicRequestAttributes(request: WSRequest): Map[String, String] = { - Map[String, String]( - "host" -> uri(request).getHost, - "path" -> uri(request).getPath, - "method" -> request.method) + } getOrElse (pjp.proceed()) } }
\ No newline at end of file diff --git a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala index 2afd31fd..3feb6246 100644 --- a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala +++ b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala @@ -117,7 +117,7 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite { "respond to the Async Action with X-Trace-Token and the renamed trace" in { val result = Await.result(route(FakeRequest(GET, "/async-renamed").withHeaders(traceTokenHeader)).get, 10 seconds) - TraceRecorder.currentContext.map(_.name) must be(Some("renamed-trace")) + TraceRecorder.currentContext.name must be("renamed-trace") Some(result.header.headers(traceTokenHeaderName)) must be(expectedToken) } diff --git a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala index b72659d2..bf1ead05 100644 --- a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala +++ b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala @@ -17,9 +17,9 @@ package kamon.play import kamon.Kamon -import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot } +import kamon.metric.TraceMetrics.TraceMetricsSnapshot import kamon.metric.{ Metrics, TraceMetrics } -import kamon.trace.TraceRecorder +import kamon.trace.{ SegmentMetricIdentityLabel, SegmentMetricIdentity, TraceRecorder } import org.scalatest.{ Matchers, WordSpecLike } import org.scalatestplus.play.OneServerPerSuite import play.api.libs.ws.WS @@ -49,7 +49,7 @@ class WSInstrumentationSpec extends WordSpecLike with Matchers with OneServerPer val snapshot = takeSnapshotOf("GET: /inside") snapshot.elapsedTime.numberOfMeasurements should be(1) snapshot.segments.size should be(1) - snapshot.segments(HttpClientRequest("http://localhost:19001/async")).numberOfMeasurements should be(1) + snapshot.segments(SegmentMetricIdentity("http://localhost:19001/async", SegmentMetricIdentityLabel.HttpClient)).numberOfMeasurements should be(1) } "propagate the TraceContext outside an Action and complete the WS request" in { diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index 3e6e982e..878c3c8c 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -21,10 +21,9 @@ import akka.routing.RoundRobinPool import akka.util.Timeout import kamon.Kamon import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.TraceMetrics.HttpClientRequest import kamon.metric._ import kamon.spray.KamonTraceDirectives -import kamon.trace.TraceRecorder +import kamon.trace.{ SegmentMetricIdentityLabel, TraceRecorder } import spray.http.{ StatusCodes, Uri } import spray.httpx.RequestBuilding import spray.routing.SimpleRoutingApp @@ -128,9 +127,9 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } ~ path("segment") { complete { - val segment = TraceRecorder.startSegment(HttpClientRequest("hello-world")) + val segment = TraceRecorder.currentContext.startSegment("hello-world", SegmentMetricIdentityLabel.HttpClient) (replier ? "hello").mapTo[String].onComplete { t ⇒ - segment.get.finish() + segment.finish() } "segment" diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala index d9cdde08..cfd204df 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala @@ -19,9 +19,8 @@ package spray.can.client import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest } -import spray.http.HttpHeaders.{ RawHeader, Host } -import kamon.trace.{SegmentAware, TraceRecorder, SegmentCompletionHandleAware} -import kamon.metric.TraceMetrics.HttpClientRequest +import spray.http.HttpHeaders.RawHeader +import kamon.trace._ import kamon.Kamon import kamon.spray.{ ClientSegmentCollectionStrategy, Spray } import akka.actor.ActorRef @@ -34,26 +33,28 @@ class ClientRequestInstrumentation { @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") def mixin: SegmentAware = SegmentAware.default - @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)") - def requestContextCreation(ctx: SegmentAware, request: HttpRequest): Unit = {} + @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)") + def requestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {} - @After("requestContextCreation(ctx, request)") - def afterRequestContextCreation(ctx: SegmentAware, request: HttpRequest): Unit = { + @After("requestContextCreation(requestContext, request)") + def afterRequestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = { // The RequestContext will be copied when a request needs to be retried but we are only interested in creating the - // completion handle the first time we create one. + // segment the first time we create one. // The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely. - if (ctx.segmentCompletionHandle.isEmpty) { - TraceRecorder.currentContext.map { traceContext ⇒ - val sprayExtension = Kamon(Spray)(traceContext.system) + if (requestContext.segment.isEmpty) { + TraceRecorder.currentContext match { + case ctx: DefaultTraceContext ⇒ + val sprayExtension = Kamon(Spray)(ctx.system) - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { - val requestAttributes = basicRequestAttributes(request) - val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes) + if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { + val clientRequestName = sprayExtension.assignHttpClientRequestName(request) + val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) - ctx.segmentCompletionHandle = Some(completionHandle) - } + requestContext.segment = segment + } + + case EmptyTraceContext ⇒ // Nothing to do here. } } } @@ -73,17 +74,15 @@ class ClientRequestInstrumentation { @Around("dispatchToCommander(requestContext, message)") def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentAware, message: Any) = { - requestContext.traceContext match { - case ctx @ Some(_) ⇒ - TraceRecorder.withInlineTraceContextReplacement(ctx) { - if (message.isInstanceOf[HttpMessageEnd]) - requestContext.segment.finish() + if (requestContext.traceContext.nonEmpty) { + TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) { + if (message.isInstanceOf[HttpMessageEnd]) + requestContext.segment.finish() - pjp.proceed() - } + pjp.proceed() + } - case None ⇒ pjp.proceed() - } + } else pjp.proceed() } @Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)") @@ -95,18 +94,21 @@ class ClientRequestInstrumentation { (request: HttpRequest) ⇒ { val responseFuture = originalSendReceive.apply(request) - TraceRecorder.currentContext.map { traceContext ⇒ - val sprayExtension = Kamon(Spray)(traceContext.system) - - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) { - val requestAttributes = basicRequestAttributes(request) - val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes) - - responseFuture.onComplete { result ⇒ - completionHandle.finish(Map.empty) - }(ec) - } + + TraceRecorder.currentContext match { + case ctx: DefaultTraceContext ⇒ + val sprayExtension = Kamon(Spray)(ctx.system) + + if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) { + val clientRequestName = sprayExtension.assignHttpClientRequestName(request) + val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + + responseFuture.onComplete { result ⇒ + segment.finish() + }(ec) + } + + case EmptyTraceContext ⇒ // Nothing to do here. } responseFuture @@ -114,26 +116,22 @@ class ClientRequestInstrumentation { } - def basicRequestAttributes(request: HttpRequest): Map[String, String] = { - Map[String, String]( - "host" -> request.header[Host].map(_.value).getOrElse("unknown"), - "path" -> request.uri.path.toString(), - "method" -> request.method.toString()) - } - @Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)") def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {} @Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)") def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = { - val modifiedHeaders = TraceRecorder.currentContext map { traceContext ⇒ - val sprayExtension = Kamon(Spray)(traceContext.system) - - if (sprayExtension.includeTraceToken) - RawHeader(sprayExtension.traceTokenHeaderName, traceContext.token) :: defaultHeaders - else - defaultHeaders - } getOrElse defaultHeaders + val modifiedHeaders = TraceRecorder.currentContext match { + case ctx: DefaultTraceContext ⇒ + val sprayExtension = Kamon(Spray)(ctx.system) + + if (sprayExtension.includeTraceToken) + RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders + else + defaultHeaders + + case EmptyTraceContext ⇒ defaultHeaders + } pjp.proceed(Array(modifiedHeaders)) } diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala index 69b0160e..74d98564 100644 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala @@ -16,11 +16,10 @@ package spray.can.server import org.aspectj.lang.annotation._ -import kamon.trace.{ TraceContext, TraceRecorder, TraceContextAware } +import kamon.trace._ import akka.actor.ActorSystem import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest } import akka.event.Logging.Warning -import scala.Some import kamon.Kamon import kamon.spray.{ SprayExtension, Spray } import org.aspectj.lang.ProceedingJoinPoint @@ -67,40 +66,36 @@ class ServerRequestInstrumentation { val incomingContext = TraceRecorder.currentContext val storedContext = openRequest.traceContext - verifyTraceContextConsistency(incomingContext, storedContext) - incomingContext match { - case None ⇒ pjp.proceed() - case Some(traceContext) ⇒ - val sprayExtension = Kamon(Spray)(traceContext.system) + // The stored context is always a DefaultTraceContext if the instrumentation is running + val system = storedContext.asInstanceOf[DefaultTraceContext].system - val proceedResult = if (sprayExtension.includeTraceToken) { - val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, traceContext.token) - pjp.proceed(Array(openRequest, responseWithHeader)) + verifyTraceContextConsistency(incomingContext, storedContext, system) - } else pjp.proceed + if (incomingContext.isEmpty) + pjp.proceed() + else { + val sprayExtension = Kamon(Spray)(system) - TraceRecorder.finish() - recordHttpServerMetrics(response, traceContext.name, sprayExtension) - proceedResult - } - } + val proceedResult = if (sprayExtension.includeTraceToken) { + val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, incomingContext.token) + pjp.proceed(Array(openRequest, responseWithHeader)) - def verifyTraceContextConsistency(incomingTraceContext: Option[TraceContext], storedTraceContext: Option[TraceContext]): Unit = { - for (original ← storedTraceContext) { - incomingTraceContext match { - case Some(incoming) if original.token != incoming.token ⇒ - publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]", incoming.system) + } else pjp.proceed - case Some(_) ⇒ // nothing to do here. - - case None ⇒ - publishWarning(s"Trace context not present while closing the Trace: [$original]", original.system) - } + TraceRecorder.finish() + recordHttpServerMetrics(response, incomingContext.name, sprayExtension) + proceedResult } + } + def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext, system: ActorSystem): Unit = { def publishWarning(text: String, system: ActorSystem): Unit = system.eventStream.publish(Warning("", classOf[ServerRequestInstrumentation], text)) + if (incomingTraceContext.nonEmpty && incomingTraceContext.token != storedTraceContext.token) + publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]", system) + else + publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]", system) } def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit = diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala index 54329645..fbf69c8a 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala @@ -21,7 +21,7 @@ import akka.actor.ActorSystem import org.scalatest.{ Matchers, WordSpecLike } import spray.httpx.RequestBuilding import spray.http.{ HttpResponse, HttpRequest } -import kamon.trace.TraceRecorder +import kamon.trace.{ SegmentMetricIdentity, TraceRecorder } import com.typesafe.config.ConfigFactory import spray.can.Http import spray.http.HttpHeaders.RawHeader @@ -31,7 +31,7 @@ import spray.client.pipelining import kamon.metric.Subscriptions.TickMetricSnapshot import scala.concurrent.duration._ import akka.pattern.pipe -import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot } +import kamon.metric.TraceMetrics.TraceMetricsSnapshot class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer { implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString( @@ -78,12 +78,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token)) + request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] - testContext.map(_.finish(Map.empty)) + testContext.finish() } } @@ -106,12 +106,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token)) + request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] - testContext.map(_.finish(Map.empty)) + testContext.finish() } } @@ -143,12 +143,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit client.expectMsgType[HttpResponse] // Finish the trace - testContext.map(_.finish(Map.empty)) + testContext.finish() val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds) traceMetrics.elapsedTime.numberOfMeasurements should be(1L) traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2) + val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) recordedSegment should not be empty recordedSegment map { segmentMetrics ⇒ segmentMetrics.numberOfMeasurements should be(1L) @@ -184,12 +184,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit client.expectMsgType[HttpResponse] // Finish the trace - testContext.map(_.finish(Map.empty)) + testContext.finish() val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds) traceMetrics.elapsedTime.numberOfMeasurements should be(1L) traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2) + val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) recordedSegment should not be empty recordedSegment map { segmentMetrics ⇒ segmentMetrics.numberOfMeasurements should be(1L) diff --git a/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala b/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala index de867035..825cc718 100644 --- a/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala +++ b/kamon-testkit/src/main/scala/testkit/TestProbeInstrumentation.scala @@ -17,7 +17,7 @@ package akka.testkit import org.aspectj.lang.annotation._ -import kamon.trace.{ TraceContextAware, TraceRecorder } +import kamon.trace.{ EmptyTraceContext, TraceContextAware, TraceRecorder } import org.aspectj.lang.ProceedingJoinPoint import akka.testkit.TestActor.RealMessage @@ -43,7 +43,7 @@ class TestProbeInstrumentation { def aroundTestProbeReply(pjp: ProceedingJoinPoint, testProbe: TestProbe): Any = { val traceContext = testProbe.lastMessage match { case msg: RealMessage ⇒ msg.asInstanceOf[TraceContextAware].traceContext - case _ ⇒ None + case _ ⇒ EmptyTraceContext } TraceRecorder.withTraceContext(traceContext) { |