diff options
Diffstat (limited to 'kamon-spray')
-rw-r--r-- | kamon-spray/src/main/resources/META-INF/aop.xml | 2 | ||||
-rw-r--r-- | kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala (renamed from kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala) | 56 | ||||
-rw-r--r-- | kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala (renamed from kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala) | 66 |
3 files changed, 78 insertions, 46 deletions
diff --git a/kamon-spray/src/main/resources/META-INF/aop.xml b/kamon-spray/src/main/resources/META-INF/aop.xml index 0e83f7e3..2f1db767 100644 --- a/kamon-spray/src/main/resources/META-INF/aop.xml +++ b/kamon-spray/src/main/resources/META-INF/aop.xml @@ -3,7 +3,7 @@ <aspectj> <aspects> <!-- Spray Server --> - <aspect name="spray.can.server.ServerRequestTracing"/> + <aspect name="spray.can.server.ServerRequestInstrumentation"/> <!-- Spray Client --> <aspect name="spray.can.client.ClientRequestTracing"/> diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala index b7479d2b..244d66ed 100644 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala @@ -16,16 +16,18 @@ package spray.can.server import org.aspectj.lang.annotation._ -import kamon.trace.{ TraceRecorder, TraceContextAware } +import kamon.trace.{ TraceContext, TraceRecorder, TraceContextAware } import akka.actor.ActorSystem -import spray.http.HttpRequest +import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest } import akka.event.Logging.Warning import scala.Some import kamon.Kamon import kamon.spray.Spray +import org.aspectj.lang.ProceedingJoinPoint +import spray.http.HttpHeaders.RawHeader @Aspect -class ServerRequestTracing { +class ServerRequestInstrumentation { @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") def mixinContextAwareToOpenRequest: TraceContextAware = TraceContextAware.default @@ -57,30 +59,52 @@ class ServerRequestTracing { TraceRecorder.clearContext } - @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)") - def openRequestCreation(openRequest: TraceContextAware): Unit = {} + @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest) && args(response)") + def openRequestCreation(openRequest: TraceContextAware, response: HttpMessagePartWrapper): Unit = {} - @After("openRequestCreation(openRequest)") - def afterFinishingRequest(openRequest: TraceContextAware): Unit = { - val storedContext = openRequest.traceContext + @Around("openRequestCreation(openRequest, response)") + def afterFinishingRequest(pjp: ProceedingJoinPoint, openRequest: TraceContextAware, response: HttpMessagePartWrapper): Any = { val incomingContext = TraceRecorder.currentContext + val storedContext = openRequest.traceContext + + verifyTraceContextConsistency(incomingContext, storedContext) + val proceedResult = incomingContext match { + case None ⇒ pjp.proceed() + case Some(traceContext) ⇒ + val sprayExtension = Kamon(Spray)(traceContext.system) + + if (sprayExtension.includeTraceToken) { + val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, traceContext.token) + pjp.proceed(Array(openRequest, responseWithHeader)) + + } else pjp.proceed + } + TraceRecorder.finish() + proceedResult + } - for (original ← storedContext) { - incomingContext match { + def verifyTraceContextConsistency(incomingTraceContext: Option[TraceContext], storedTraceContext: Option[TraceContext]): Unit = { + for (original ← storedTraceContext) { + incomingTraceContext match { case Some(incoming) if original.token != incoming.token ⇒ - publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]") + publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]", incoming.system) case Some(_) ⇒ // nothing to do here. case None ⇒ - publishWarning(s"Trace context not present while closing the Trace: [$original]") + publishWarning(s"Trace context not present while closing the Trace: [$original]", original.system) } } - def publishWarning(text: String): Unit = { - val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system - system.eventStream.publish(Warning("", classOf[ServerRequestTracing], text)) - } + def publishWarning(text: String, system: ActorSystem): Unit = + system.eventStream.publish(Warning("", classOf[ServerRequestInstrumentation], text)) + } + + def includeTraceTokenIfPossible(response: HttpMessagePartWrapper, traceTokenHeaderName: String, token: String): HttpMessagePartWrapper = + response match { + case response: HttpResponse ⇒ response.withHeaders(RawHeader(traceTokenHeaderName, token)) + case other ⇒ other + } } diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala index d14ea6b1..9b4acdd9 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala @@ -33,55 +33,63 @@ import spray.http.HttpHeaders.Host import akka.io.{ Tcp, IO } import spray.can.Http import akka.io.Tcp.Bound +import kamon.metrics.{TraceMetrics, Metrics} +import kamon.metrics.TraceMetrics.TraceMetricSnapshot +import kamon.metrics.Subscriptions.TickMetricSnapshot -class ServerRequestTracingSpec extends TestKit(ActorSystem("spec")) with WordSpecLike with Matchers with RequestBuilding with ScalaFutures with PatienceConfiguration with TestServer { +class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with WordSpecLike with Matchers with RequestBuilding with ScalaFutures with PatienceConfiguration with TestServer { "the spray server request tracing instrumentation" should { - "reply back with a trace token header" in { + "reply back with the same trace token header provided in the request" in { val (connection, server) = buildServer() val client = TestProbe() - client.send(connection, Get("/")) + client.send(connection, Get("/").withHeaders(RawHeader("X-Trace-Token", "reply-trace-token"))) server.expectMsgType[HttpRequest] server.reply(HttpResponse(entity = "ok")) - client.expectMsgType[HttpResponse] + val response = client.expectMsgType[HttpResponse] - fail() + response.headers should contain(RawHeader("X-Trace-Token", "reply-trace-token")) } - /* "finish a request even if no TraceContext is received in the response" in { - send { - Get(s"http://127.0.0.1:$port/clearcontext") - } + "reply back with a automatically assigned trace token if none was provided with the request" in { + val (connection, server) = buildServer() + val client = TestProbe() + + client.send(connection, Get("/")) + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "ok")) + val response = client.expectMsgType[HttpResponse] + + response.headers.filter(_.name == "X-Trace-Token").size should be(1) - within(5 seconds) { - fishForNamedTrace("clearcontext") - } } - "give a initial transaction name using the method and path from the request" in { - send { - Get(s"http://127.0.0.1:$port/accounts") - } + "open and finish a trace during the lifetime of a request" in { + val (connection, server) = buildServer() + val client = TestProbe() + + val metricListener = TestProbe() + Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) + + client.send(connection, Get("/open-and-finish")) + server.expectMsgType[HttpRequest] + server.reply(HttpResponse(entity = "ok")) + client.expectMsgType[HttpResponse] - within(5 seconds) { - fishForNamedTrace("accounts") + metricListener.fishForMessage() { + case snapshot @ TickMetricSnapshot(_, _, metrics) => metrics.keys.exists(_.name.contains("open-and-finish")) + case other => false } - }*/ - } - /* - - si no llega uow, crear una - - si llega con uow hay que propagarla - */ - - def fishForNamedTrace(traceName: String) = fishForMessage() { - case trace: UowTrace if trace.name.contains(traceName) ⇒ true - case _ ⇒ false + } + } + + } -trait TestServer extends SimpleRoutingApp { +trait TestServer { self: TestKit ⇒ def buildServer(): (ActorRef, TestProbe) = { |