From 4abab8df49d1bc5d9a051a8b54852e0712be7b74 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 16 Jun 2014 15:52:14 -0300 Subject: + play: refactor in RequestInstrumentation in order to propagate the TraceContext through the filters and all actions in the incoming request --- .../instrumentation/RequestInstrumentation.scala | 91 +++++++++------------- .../kamon/play/RequestInstrumentationSpec.scala | 54 ++++++++++--- 2 files changed, 82 insertions(+), 63 deletions(-) (limited to 'kamon-play') diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala index af11a07a..00170b1b 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala @@ -16,42 +16,62 @@ package kamon.play.instrumentation -import kamon.trace.{ TraceRecorder, TraceContextAware } import kamon.Kamon import kamon.play.Play -import play.libs.Akka -import play.api.mvc._ -import akka.actor.ActorSystem +import kamon.trace.{ TraceContextAware, TraceRecorder } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ -import scala.Some +import play.api.mvc._ +import play.libs.Akka @Aspect class RequestInstrumentation { - @DeclareMixin("play.api.mvc.RequestHeader$$anon$4") + @DeclareMixin("play.api.mvc.RequestHeader+") def mixinContextAwareNewRequest: TraceContextAware = TraceContextAware.default - @Pointcut("execution(* play.api.GlobalSettings+.onStart(*)) && args(application)") - def onStart(application: play.api.Application): Unit = {} - - @After("onStart(application)") + @After("execution(* play.api.GlobalSettings+.onStart(*)) && args(application)") def afterApplicationStart(application: play.api.Application): Unit = { Kamon(Play)(Akka.system()) } - @Pointcut("execution(* play.api.GlobalSettings+.doFilter(*)) && args(next)") - def doFilter(next: EssentialAction): Unit = {} + @Before("execution(* play.api.GlobalSettings+.onRouteRequest(..)) && args(requestHeader)") + def onRouteRequest(requestHeader: RequestHeader): Unit = { + val system = Akka.system() + val playExtension = Kamon(Play)(system) + val defaultTraceName: String = s"${requestHeader.method}: ${requestHeader.uri}" - @Around("doFilter(next)") - def afterDoFilter(pjp: ProceedingJoinPoint, next: EssentialAction): Any = { - Filters(pjp.proceed(Array(next)).asInstanceOf[EssentialAction], kamonRequestFilter) + val token = if (playExtension.includeTraceToken) { + requestHeader.headers.toSimpleMap.find(_._1 == playExtension.traceTokenHeaderName).map(_._2) + } else None + + TraceRecorder.start(defaultTraceName, token)(system) } - @Pointcut("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)") - def onError(request: TraceContextAware, ex: Throwable): Unit = {} + @Around("execution(* play.api.GlobalSettings+.doFilter(*)) && args(next)") + def afterDoFilter(pjp: ProceedingJoinPoint, next: EssentialAction): Any = { + val essentialAction = (requestHeader: RequestHeader) ⇒ { + + val incomingContext = TraceRecorder.currentContext + val executor = Kamon(Play)(Akka.system()).defaultDispatcher + + next(requestHeader).map { + result ⇒ + TraceRecorder.finish() + incomingContext match { + case None ⇒ result + case Some(traceContext) ⇒ + val playExtension = Kamon(Play)(traceContext.system) + if (playExtension.includeTraceToken) { + result.withHeaders(playExtension.traceTokenHeaderName -> traceContext.token) + } else result + } + }(executor) + } + pjp.proceed(Array(EssentialAction(essentialAction))) + } - @Around("onError(request, ex)") + @Around("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)") def aroundOnError(pjp: ProceedingJoinPoint, request: TraceContextAware, ex: Throwable): Any = request.traceContext match { case None ⇒ pjp.proceed() case Some(ctx) ⇒ { @@ -60,39 +80,4 @@ class RequestInstrumentation { pjp.proceed() } } - - private[this] val kamonRequestFilter = Filter { (nextFilter, requestHeader) ⇒ - - processRequest(requestHeader) - - val incomingContext = TraceRecorder.currentContext - val executor = Kamon(Play)(Akka.system()).defaultDispatcher - - nextFilter(requestHeader).map { result ⇒ - - TraceRecorder.finish() - - val simpleResult = incomingContext match { - case None ⇒ result - case Some(traceContext) ⇒ - val playExtension = Kamon(Play)(traceContext.system) - if (playExtension.includeTraceToken) { - result.withHeaders(playExtension.traceTokenHeaderName -> traceContext.token) - } else result - } - simpleResult - }(executor) - } - - private[this] def processRequest(requestHeader: RequestHeader): Unit = { - val system: ActorSystem = Akka.system() - val playExtension = Kamon(Play)(system) - val defaultTraceName: String = s"${requestHeader.method}: ${requestHeader.uri}" - - val token = if (playExtension.includeTraceToken) { - requestHeader.headers.toSimpleMap.find(_._1 == playExtension.traceTokenHeaderName).map(_._2) - } else None - - TraceRecorder.start(defaultTraceName, token)(system) - } } diff --git a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala index 8cf2d8e1..710c6ed5 100644 --- a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala +++ b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala @@ -16,15 +16,16 @@ package kamon.play -import play.api.test._ -import play.api.test.Helpers._ +import kamon.play.action.TraceName +import kamon.trace.{ TraceLocal, TraceRecorder } import org.scalatestplus.play._ -import play.api.mvc.{ Results, Action } +import play.api.libs.concurrent.Execution.Implicits.defaultContext import play.api.mvc.Results.Ok -import scala.Some +import play.api.mvc._ +import play.api.test.Helpers._ +import play.api.test._ + import scala.concurrent.Future -import kamon.play.action.TraceName -import kamon.trace.TraceRecorder class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite { @@ -32,7 +33,8 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite { val executor = scala.concurrent.ExecutionContext.Implicits.global - implicit override lazy val app = FakeApplication(withRoutes = { + implicit override lazy val app = FakeApplication(withGlobal = Some(MockGlobalTest), withRoutes = { + case ("GET", "/async") ⇒ Action.async { Future { @@ -59,16 +61,23 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite { }(executor) } } + case ("GET", "/retrieve") ⇒ + Action { + Ok("retrieve from TraceLocal") + } }) private val traceTokenValue = "kamon-trace-token-test" private val traceTokenHeaderName = "X-Trace-Token" private val expectedToken = Some(traceTokenValue) private val traceTokenHeader = traceTokenHeaderName -> traceTokenValue + private val traceLocalStorageValue = "localStorageValue" + private val traceLocalStorageKey = "localStorageKey" + private val traceLocalStorageHeader = traceLocalStorageKey -> traceLocalStorageValue "the Request instrumentation" should { "respond to the Async Action with X-Trace-Token" in { - val Some(result) = route(FakeRequest(GET, "/async").withHeaders(traceTokenHeader)) + val Some(result) = route(FakeRequest(GET, "/async").withHeaders(traceTokenHeader, traceLocalStorageHeader)) header(traceTokenHeaderName, result) must be(expectedToken) } @@ -90,9 +99,34 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite { "respond to the Async Action with X-Trace-Token and the renamed trace" in { val Some(result) = route(FakeRequest(GET, "/async-renamed").withHeaders(traceTokenHeader)) - Thread.sleep(3000) + Thread.sleep(500) // wait to complete the future TraceRecorder.currentContext.map(_.name) must be(Some("renamed-trace")) header(traceTokenHeaderName, result) must be(expectedToken) } + + "propagate the TraceContext and LocalStorage through of filters in the current request" in { + val Some(result) = route(FakeRequest(GET, "/retrieve").withHeaders(traceTokenHeader, traceLocalStorageHeader)) + TraceLocal.retrieve(TraceLocalKey).get must be(traceLocalStorageValue) + } } -} \ No newline at end of file + + object MockGlobalTest extends WithFilters(TraceLocalFilter) + + object TraceLocalKey extends TraceLocal.TraceLocalKey { + type ValueType = String + } + + object TraceLocalFilter extends Filter { + override def apply(next: (RequestHeader) ⇒ Future[Result])(header: RequestHeader): Future[Result] = { + TraceRecorder.withTraceContext(TraceRecorder.currentContext) { + + TraceLocal.store(TraceLocalKey)(header.headers.get(traceLocalStorageKey).getOrElse("unknown")) + + next(header).map { + result ⇒ result.withHeaders((traceLocalStorageKey -> TraceLocal.retrieve(TraceLocalKey).get)) + } + } + } + } +} + -- cgit v1.2.3