aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray
diff options
context:
space:
mode:
authorIvan Topolnak <itopolnak@despegar.com>2014-01-31 18:13:18 -0300
committerIvan Topolnak <itopolnak@despegar.com>2014-01-31 18:13:18 -0300
commitb51adf72242b953b2d9723491ef81473138ec107 (patch)
treec0c4473667449cff09fe16660365f9f3c292d8f2 /kamon-spray
parent59c01d880379dfc48c6d82da13ef628a587a9bbb (diff)
downloadKamon-b51adf72242b953b2d9723491ef81473138ec107.tar.gz
Kamon-b51adf72242b953b2d9723491ef81473138ec107.tar.bz2
Kamon-b51adf72242b953b2d9723491ef81473138ec107.zip
update spray server request instrumentation tests
Diffstat (limited to 'kamon-spray')
-rw-r--r--kamon-spray/src/main/resources/META-INF/aop.xml2
-rw-r--r--kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala (renamed from kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala)56
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala (renamed from kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala)66
3 files changed, 78 insertions, 46 deletions
diff --git a/kamon-spray/src/main/resources/META-INF/aop.xml b/kamon-spray/src/main/resources/META-INF/aop.xml
index 0e83f7e3..2f1db767 100644
--- a/kamon-spray/src/main/resources/META-INF/aop.xml
+++ b/kamon-spray/src/main/resources/META-INF/aop.xml
@@ -3,7 +3,7 @@
<aspectj>
<aspects>
<!-- Spray Server -->
- <aspect name="spray.can.server.ServerRequestTracing"/>
+ <aspect name="spray.can.server.ServerRequestInstrumentation"/>
<!-- Spray Client -->
<aspect name="spray.can.client.ClientRequestTracing"/>
diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
index b7479d2b..244d66ed 100644
--- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala
+++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala
@@ -16,16 +16,18 @@
package spray.can.server
import org.aspectj.lang.annotation._
-import kamon.trace.{ TraceRecorder, TraceContextAware }
+import kamon.trace.{ TraceContext, TraceRecorder, TraceContextAware }
import akka.actor.ActorSystem
-import spray.http.HttpRequest
+import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest }
import akka.event.Logging.Warning
import scala.Some
import kamon.Kamon
import kamon.spray.Spray
+import org.aspectj.lang.ProceedingJoinPoint
+import spray.http.HttpHeaders.RawHeader
@Aspect
-class ServerRequestTracing {
+class ServerRequestInstrumentation {
@DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest")
def mixinContextAwareToOpenRequest: TraceContextAware = TraceContextAware.default
@@ -57,30 +59,52 @@ class ServerRequestTracing {
TraceRecorder.clearContext
}
- @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)")
- def openRequestCreation(openRequest: TraceContextAware): Unit = {}
+ @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest) && args(response)")
+ def openRequestCreation(openRequest: TraceContextAware, response: HttpMessagePartWrapper): Unit = {}
- @After("openRequestCreation(openRequest)")
- def afterFinishingRequest(openRequest: TraceContextAware): Unit = {
- val storedContext = openRequest.traceContext
+ @Around("openRequestCreation(openRequest, response)")
+ def afterFinishingRequest(pjp: ProceedingJoinPoint, openRequest: TraceContextAware, response: HttpMessagePartWrapper): Any = {
val incomingContext = TraceRecorder.currentContext
+ val storedContext = openRequest.traceContext
+
+ verifyTraceContextConsistency(incomingContext, storedContext)
+ val proceedResult = incomingContext match {
+ case None ⇒ pjp.proceed()
+ case Some(traceContext) ⇒
+ val sprayExtension = Kamon(Spray)(traceContext.system)
+
+ if (sprayExtension.includeTraceToken) {
+ val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, traceContext.token)
+ pjp.proceed(Array(openRequest, responseWithHeader))
+
+ } else pjp.proceed
+ }
+
TraceRecorder.finish()
+ proceedResult
+ }
- for (original ← storedContext) {
- incomingContext match {
+ def verifyTraceContextConsistency(incomingTraceContext: Option[TraceContext], storedTraceContext: Option[TraceContext]): Unit = {
+ for (original ← storedTraceContext) {
+ incomingTraceContext match {
case Some(incoming) if original.token != incoming.token ⇒
- publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]")
+ publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]", incoming.system)
case Some(_) ⇒ // nothing to do here.
case None ⇒
- publishWarning(s"Trace context not present while closing the Trace: [$original]")
+ publishWarning(s"Trace context not present while closing the Trace: [$original]", original.system)
}
}
- def publishWarning(text: String): Unit = {
- val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system
- system.eventStream.publish(Warning("", classOf[ServerRequestTracing], text))
- }
+ def publishWarning(text: String, system: ActorSystem): Unit =
+ system.eventStream.publish(Warning("", classOf[ServerRequestInstrumentation], text))
+
}
+
+ def includeTraceTokenIfPossible(response: HttpMessagePartWrapper, traceTokenHeaderName: String, token: String): HttpMessagePartWrapper =
+ response match {
+ case response: HttpResponse ⇒ response.withHeaders(RawHeader(traceTokenHeaderName, token))
+ case other ⇒ other
+ }
}
diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
index d14ea6b1..9b4acdd9 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
@@ -33,55 +33,63 @@ import spray.http.HttpHeaders.Host
import akka.io.{ Tcp, IO }
import spray.can.Http
import akka.io.Tcp.Bound
+import kamon.metrics.{TraceMetrics, Metrics}
+import kamon.metrics.TraceMetrics.TraceMetricSnapshot
+import kamon.metrics.Subscriptions.TickMetricSnapshot
-class ServerRequestTracingSpec extends TestKit(ActorSystem("spec")) with WordSpecLike with Matchers with RequestBuilding with ScalaFutures with PatienceConfiguration with TestServer {
+class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with WordSpecLike with Matchers with RequestBuilding with ScalaFutures with PatienceConfiguration with TestServer {
"the spray server request tracing instrumentation" should {
- "reply back with a trace token header" in {
+ "reply back with the same trace token header provided in the request" in {
val (connection, server) = buildServer()
val client = TestProbe()
- client.send(connection, Get("/"))
+ client.send(connection, Get("/").withHeaders(RawHeader("X-Trace-Token", "reply-trace-token")))
server.expectMsgType[HttpRequest]
server.reply(HttpResponse(entity = "ok"))
- client.expectMsgType[HttpResponse]
+ val response = client.expectMsgType[HttpResponse]
- fail()
+ response.headers should contain(RawHeader("X-Trace-Token", "reply-trace-token"))
}
- /* "finish a request even if no TraceContext is received in the response" in {
- send {
- Get(s"http://127.0.0.1:$port/clearcontext")
- }
+ "reply back with a automatically assigned trace token if none was provided with the request" in {
+ val (connection, server) = buildServer()
+ val client = TestProbe()
+
+ client.send(connection, Get("/"))
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ val response = client.expectMsgType[HttpResponse]
+
+ response.headers.filter(_.name == "X-Trace-Token").size should be(1)
- within(5 seconds) {
- fishForNamedTrace("clearcontext")
- }
}
- "give a initial transaction name using the method and path from the request" in {
- send {
- Get(s"http://127.0.0.1:$port/accounts")
- }
+ "open and finish a trace during the lifetime of a request" in {
+ val (connection, server) = buildServer()
+ val client = TestProbe()
+
+ val metricListener = TestProbe()
+ Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
+
+ client.send(connection, Get("/open-and-finish"))
+ server.expectMsgType[HttpRequest]
+ server.reply(HttpResponse(entity = "ok"))
+ client.expectMsgType[HttpResponse]
- within(5 seconds) {
- fishForNamedTrace("accounts")
+ metricListener.fishForMessage() {
+ case snapshot @ TickMetricSnapshot(_, _, metrics) => metrics.keys.exists(_.name.contains("open-and-finish"))
+ case other => false
}
- }*/
- }
- /*
- - si no llega uow, crear una
- - si llega con uow hay que propagarla
- */
-
- def fishForNamedTrace(traceName: String) = fishForMessage() {
- case trace: UowTrace if trace.name.contains(traceName) ⇒ true
- case _ ⇒ false
+ }
+
}
+
+
}
-trait TestServer extends SimpleRoutingApp {
+trait TestServer {
self: TestKit ⇒
def buildServer(): (ActorRef, TestProbe) = {