diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-10-26 02:21:11 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-10-26 02:21:11 +0200 |
commit | cd8dce169b4231bf533445656bfb5a35034a6304 (patch) | |
tree | 4561bf3b8b7890891990a0d1b500155975d54277 /kamon-spray/src | |
parent | ea1a0d5d76988992227eb30b0baaf8e97678c946 (diff) | |
download | Kamon-cd8dce169b4231bf533445656bfb5a35034a6304.tar.gz Kamon-cd8dce169b4231bf533445656bfb5a35034a6304.tar.bz2 Kamon-cd8dce169b4231bf533445656bfb5a35034a6304.zip |
= all: upgrade to be compatible with the latest code in core
Diffstat (limited to 'kamon-spray/src')
3 files changed, 82 insertions, 89 deletions
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala index d9cdde08..cfd204df 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala @@ -19,9 +19,8 @@ package spray.can.client import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import spray.http.{ HttpHeader, HttpResponse, HttpMessageEnd, HttpRequest } -import spray.http.HttpHeaders.{ RawHeader, Host } -import kamon.trace.{SegmentAware, TraceRecorder, SegmentCompletionHandleAware} -import kamon.metric.TraceMetrics.HttpClientRequest +import spray.http.HttpHeaders.RawHeader +import kamon.trace._ import kamon.Kamon import kamon.spray.{ ClientSegmentCollectionStrategy, Spray } import akka.actor.ActorRef @@ -34,26 +33,28 @@ class ClientRequestInstrumentation { @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") def mixin: SegmentAware = SegmentAware.default - @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)") - def requestContextCreation(ctx: SegmentAware, request: HttpRequest): Unit = {} + @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(requestContext) && args(request, *, *, *)") + def requestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = {} - @After("requestContextCreation(ctx, request)") - def afterRequestContextCreation(ctx: SegmentAware, request: HttpRequest): Unit = { + @After("requestContextCreation(requestContext, request)") + def afterRequestContextCreation(requestContext: SegmentAware, request: HttpRequest): Unit = { // The RequestContext will be copied when a request needs to be retried but we are only interested in creating the - // completion handle the first time we create one. + // segment the first time we create one. // The read to ctx.segmentCompletionHandle should take care of initializing the aspect timely. - if (ctx.segmentCompletionHandle.isEmpty) { - TraceRecorder.currentContext.map { traceContext ⇒ - val sprayExtension = Kamon(Spray)(traceContext.system) + if (requestContext.segment.isEmpty) { + TraceRecorder.currentContext match { + case ctx: DefaultTraceContext ⇒ + val sprayExtension = Kamon(Spray)(ctx.system) - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { - val requestAttributes = basicRequestAttributes(request) - val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes) + if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { + val clientRequestName = sprayExtension.assignHttpClientRequestName(request) + val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) - ctx.segmentCompletionHandle = Some(completionHandle) - } + requestContext.segment = segment + } + + case EmptyTraceContext ⇒ // Nothing to do here. } } } @@ -73,17 +74,15 @@ class ClientRequestInstrumentation { @Around("dispatchToCommander(requestContext, message)") def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentAware, message: Any) = { - requestContext.traceContext match { - case ctx @ Some(_) ⇒ - TraceRecorder.withInlineTraceContextReplacement(ctx) { - if (message.isInstanceOf[HttpMessageEnd]) - requestContext.segment.finish() + if (requestContext.traceContext.nonEmpty) { + TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) { + if (message.isInstanceOf[HttpMessageEnd]) + requestContext.segment.finish() - pjp.proceed() - } + pjp.proceed() + } - case None ⇒ pjp.proceed() - } + } else pjp.proceed() } @Pointcut("execution(* spray.client.pipelining$.sendReceive(akka.actor.ActorRef, *, *)) && args(transport, ec, timeout)") @@ -95,18 +94,21 @@ class ClientRequestInstrumentation { (request: HttpRequest) ⇒ { val responseFuture = originalSendReceive.apply(request) - TraceRecorder.currentContext.map { traceContext ⇒ - val sprayExtension = Kamon(Spray)(traceContext.system) - - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) { - val requestAttributes = basicRequestAttributes(request) - val clientRequestName = sprayExtension.assignHttpClientRequestName(request) - val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes) - - responseFuture.onComplete { result ⇒ - completionHandle.finish(Map.empty) - }(ec) - } + + TraceRecorder.currentContext match { + case ctx: DefaultTraceContext ⇒ + val sprayExtension = Kamon(Spray)(ctx.system) + + if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) { + val clientRequestName = sprayExtension.assignHttpClientRequestName(request) + val segment = ctx.startSegment(clientRequestName, SegmentMetricIdentityLabel.HttpClient) + + responseFuture.onComplete { result ⇒ + segment.finish() + }(ec) + } + + case EmptyTraceContext ⇒ // Nothing to do here. } responseFuture @@ -114,26 +116,22 @@ class ClientRequestInstrumentation { } - def basicRequestAttributes(request: HttpRequest): Map[String, String] = { - Map[String, String]( - "host" -> request.header[Host].map(_.value).getOrElse("unknown"), - "path" -> request.uri.path.toString(), - "method" -> request.method.toString()) - } - @Pointcut("call(* spray.http.HttpMessage.withDefaultHeaders(*)) && within(spray.can.client.HttpHostConnector) && args(defaultHeaders)") def includingDefaultHeadersAtHttpHostConnector(defaultHeaders: List[HttpHeader]): Unit = {} @Around("includingDefaultHeadersAtHttpHostConnector(defaultHeaders)") def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, defaultHeaders: List[HttpHeader]): Any = { - val modifiedHeaders = TraceRecorder.currentContext map { traceContext ⇒ - val sprayExtension = Kamon(Spray)(traceContext.system) - - if (sprayExtension.includeTraceToken) - RawHeader(sprayExtension.traceTokenHeaderName, traceContext.token) :: defaultHeaders - else - defaultHeaders - } getOrElse defaultHeaders + val modifiedHeaders = TraceRecorder.currentContext match { + case ctx: DefaultTraceContext ⇒ + val sprayExtension = Kamon(Spray)(ctx.system) + + if (sprayExtension.includeTraceToken) + RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders + else + defaultHeaders + + case EmptyTraceContext ⇒ defaultHeaders + } pjp.proceed(Array(modifiedHeaders)) } diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala index 69b0160e..74d98564 100644 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala @@ -16,11 +16,10 @@ package spray.can.server import org.aspectj.lang.annotation._ -import kamon.trace.{ TraceContext, TraceRecorder, TraceContextAware } +import kamon.trace._ import akka.actor.ActorSystem import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest } import akka.event.Logging.Warning -import scala.Some import kamon.Kamon import kamon.spray.{ SprayExtension, Spray } import org.aspectj.lang.ProceedingJoinPoint @@ -67,40 +66,36 @@ class ServerRequestInstrumentation { val incomingContext = TraceRecorder.currentContext val storedContext = openRequest.traceContext - verifyTraceContextConsistency(incomingContext, storedContext) - incomingContext match { - case None ⇒ pjp.proceed() - case Some(traceContext) ⇒ - val sprayExtension = Kamon(Spray)(traceContext.system) + // The stored context is always a DefaultTraceContext if the instrumentation is running + val system = storedContext.asInstanceOf[DefaultTraceContext].system - val proceedResult = if (sprayExtension.includeTraceToken) { - val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, traceContext.token) - pjp.proceed(Array(openRequest, responseWithHeader)) + verifyTraceContextConsistency(incomingContext, storedContext, system) - } else pjp.proceed + if (incomingContext.isEmpty) + pjp.proceed() + else { + val sprayExtension = Kamon(Spray)(system) - TraceRecorder.finish() - recordHttpServerMetrics(response, traceContext.name, sprayExtension) - proceedResult - } - } + val proceedResult = if (sprayExtension.includeTraceToken) { + val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, incomingContext.token) + pjp.proceed(Array(openRequest, responseWithHeader)) - def verifyTraceContextConsistency(incomingTraceContext: Option[TraceContext], storedTraceContext: Option[TraceContext]): Unit = { - for (original ← storedTraceContext) { - incomingTraceContext match { - case Some(incoming) if original.token != incoming.token ⇒ - publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]", incoming.system) + } else pjp.proceed - case Some(_) ⇒ // nothing to do here. - - case None ⇒ - publishWarning(s"Trace context not present while closing the Trace: [$original]", original.system) - } + TraceRecorder.finish() + recordHttpServerMetrics(response, incomingContext.name, sprayExtension) + proceedResult } + } + def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext, system: ActorSystem): Unit = { def publishWarning(text: String, system: ActorSystem): Unit = system.eventStream.publish(Warning("", classOf[ServerRequestInstrumentation], text)) + if (incomingTraceContext.nonEmpty && incomingTraceContext.token != storedTraceContext.token) + publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]", system) + else + publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]", system) } def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit = diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala index 54329645..fbf69c8a 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala @@ -21,7 +21,7 @@ import akka.actor.ActorSystem import org.scalatest.{ Matchers, WordSpecLike } import spray.httpx.RequestBuilding import spray.http.{ HttpResponse, HttpRequest } -import kamon.trace.TraceRecorder +import kamon.trace.{ SegmentMetricIdentity, TraceRecorder } import com.typesafe.config.ConfigFactory import spray.can.Http import spray.http.HttpHeaders.RawHeader @@ -31,7 +31,7 @@ import spray.client.pipelining import kamon.metric.Subscriptions.TickMetricSnapshot import scala.concurrent.duration._ import akka.pattern.pipe -import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot } +import kamon.metric.TraceMetrics.TraceMetricsSnapshot class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with TestServer { implicit lazy val system: ActorSystem = ActorSystem("client-request-instrumentation-spec", ConfigFactory.parseString( @@ -78,12 +78,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token)) + request.headers should contain(RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] - testContext.map(_.finish(Map.empty)) + testContext.finish() } } @@ -106,12 +106,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit // Receive the request and reply back val request = server.expectMsgType[HttpRequest] - request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.get.token)) + request.headers should not contain (RawHeader(Kamon(Spray).traceTokenHeaderName, testContext.token)) // Finish the request cycle, just to avoid error messages on the logs. server.reply(HttpResponse(entity = "ok")) client.expectMsgType[HttpResponse] - testContext.map(_.finish(Map.empty)) + testContext.finish() } } @@ -143,12 +143,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit client.expectMsgType[HttpResponse] // Finish the trace - testContext.map(_.finish(Map.empty)) + testContext.finish() val traceMetrics = expectTraceMetrics("pipelining-strategy-client-request", metricListener, 3 seconds) traceMetrics.elapsedTime.numberOfMeasurements should be(1L) traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2) + val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) recordedSegment should not be empty recordedSegment map { segmentMetrics ⇒ segmentMetrics.numberOfMeasurements should be(1L) @@ -184,12 +184,12 @@ class ClientRequestInstrumentationSpec extends TestKitBase with WordSpecLike wit client.expectMsgType[HttpResponse] // Finish the trace - testContext.map(_.finish(Map.empty)) + testContext.finish() val traceMetrics = expectTraceMetrics("internal-strategy-client-request", metricListener, 3 seconds) traceMetrics.elapsedTime.numberOfMeasurements should be(1L) traceMetrics.segments should not be empty - val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[HttpClientRequest] } map (_._2) + val recordedSegment = traceMetrics.segments.find { case (k, v) ⇒ k.isInstanceOf[SegmentMetricIdentity] } map (_._2) recordedSegment should not be empty recordedSegment map { segmentMetrics ⇒ segmentMetrics.numberOfMeasurements should be(1L) |