aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-06-16 15:52:14 -0300
committerDiego <diegolparra@gmail.com>2014-06-16 15:52:14 -0300
commit4abab8df49d1bc5d9a051a8b54852e0712be7b74 (patch)
tree3e8be92dd0b458c388cfae54da1f6318a9d68f0e
parente3d16846beb4ce0a629a79430d8742f369a73a32 (diff)
downloadKamon-4abab8df49d1bc5d9a051a8b54852e0712be7b74.tar.gz
Kamon-4abab8df49d1bc5d9a051a8b54852e0712be7b74.tar.bz2
Kamon-4abab8df49d1bc5d9a051a8b54852e0712be7b74.zip
+ play: refactor in RequestInstrumentation in order to propagate the TraceContext through the filters and all actions in the incoming request
-rw-r--r--kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala3
-rw-r--r--kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala91
-rw-r--r--kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala54
3 files changed, 83 insertions, 65 deletions
diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
index 0d33ba1d..028e9608 100644
--- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
+++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
@@ -23,7 +23,7 @@ import akka.util.ByteString
import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.metrics.MetricSnapshot.Measurement
import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType }
-import java.text.{DecimalFormatSymbols, DecimalFormat}
+import java.text.{ DecimalFormatSymbols, DecimalFormat }
import kamon.metrics.{ MetricIdentity, MetricGroupIdentity }
import java.util.Locale
@@ -35,7 +35,6 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long
val symbols = DecimalFormatSymbols.getInstance(Locale.US)
symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of.
-
// Absurdly high number of decimal digits, let the other end lose precision if it needs to.
val samplingRateFormat = new DecimalFormat("#.################################################################", symbols)
diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
index af11a07a..00170b1b 100644
--- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
+++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
@@ -16,42 +16,62 @@
package kamon.play.instrumentation
-import kamon.trace.{ TraceRecorder, TraceContextAware }
import kamon.Kamon
import kamon.play.Play
-import play.libs.Akka
-import play.api.mvc._
-import akka.actor.ActorSystem
+import kamon.trace.{ TraceContextAware, TraceRecorder }
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation._
-import scala.Some
+import play.api.mvc._
+import play.libs.Akka
@Aspect
class RequestInstrumentation {
- @DeclareMixin("play.api.mvc.RequestHeader$$anon$4")
+ @DeclareMixin("play.api.mvc.RequestHeader+")
def mixinContextAwareNewRequest: TraceContextAware = TraceContextAware.default
- @Pointcut("execution(* play.api.GlobalSettings+.onStart(*)) && args(application)")
- def onStart(application: play.api.Application): Unit = {}
-
- @After("onStart(application)")
+ @After("execution(* play.api.GlobalSettings+.onStart(*)) && args(application)")
def afterApplicationStart(application: play.api.Application): Unit = {
Kamon(Play)(Akka.system())
}
- @Pointcut("execution(* play.api.GlobalSettings+.doFilter(*)) && args(next)")
- def doFilter(next: EssentialAction): Unit = {}
+ @Before("execution(* play.api.GlobalSettings+.onRouteRequest(..)) && args(requestHeader)")
+ def onRouteRequest(requestHeader: RequestHeader): Unit = {
+ val system = Akka.system()
+ val playExtension = Kamon(Play)(system)
+ val defaultTraceName: String = s"${requestHeader.method}: ${requestHeader.uri}"
- @Around("doFilter(next)")
- def afterDoFilter(pjp: ProceedingJoinPoint, next: EssentialAction): Any = {
- Filters(pjp.proceed(Array(next)).asInstanceOf[EssentialAction], kamonRequestFilter)
+ val token = if (playExtension.includeTraceToken) {
+ requestHeader.headers.toSimpleMap.find(_._1 == playExtension.traceTokenHeaderName).map(_._2)
+ } else None
+
+ TraceRecorder.start(defaultTraceName, token)(system)
}
- @Pointcut("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)")
- def onError(request: TraceContextAware, ex: Throwable): Unit = {}
+ @Around("execution(* play.api.GlobalSettings+.doFilter(*)) && args(next)")
+ def afterDoFilter(pjp: ProceedingJoinPoint, next: EssentialAction): Any = {
+ val essentialAction = (requestHeader: RequestHeader) ⇒ {
+
+ val incomingContext = TraceRecorder.currentContext
+ val executor = Kamon(Play)(Akka.system()).defaultDispatcher
+
+ next(requestHeader).map {
+ result ⇒
+ TraceRecorder.finish()
+ incomingContext match {
+ case None ⇒ result
+ case Some(traceContext) ⇒
+ val playExtension = Kamon(Play)(traceContext.system)
+ if (playExtension.includeTraceToken) {
+ result.withHeaders(playExtension.traceTokenHeaderName -> traceContext.token)
+ } else result
+ }
+ }(executor)
+ }
+ pjp.proceed(Array(EssentialAction(essentialAction)))
+ }
- @Around("onError(request, ex)")
+ @Around("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)")
def aroundOnError(pjp: ProceedingJoinPoint, request: TraceContextAware, ex: Throwable): Any = request.traceContext match {
case None ⇒ pjp.proceed()
case Some(ctx) ⇒ {
@@ -60,39 +80,4 @@ class RequestInstrumentation {
pjp.proceed()
}
}
-
- private[this] val kamonRequestFilter = Filter { (nextFilter, requestHeader) ⇒
-
- processRequest(requestHeader)
-
- val incomingContext = TraceRecorder.currentContext
- val executor = Kamon(Play)(Akka.system()).defaultDispatcher
-
- nextFilter(requestHeader).map { result ⇒
-
- TraceRecorder.finish()
-
- val simpleResult = incomingContext match {
- case None ⇒ result
- case Some(traceContext) ⇒
- val playExtension = Kamon(Play)(traceContext.system)
- if (playExtension.includeTraceToken) {
- result.withHeaders(playExtension.traceTokenHeaderName -> traceContext.token)
- } else result
- }
- simpleResult
- }(executor)
- }
-
- private[this] def processRequest(requestHeader: RequestHeader): Unit = {
- val system: ActorSystem = Akka.system()
- val playExtension = Kamon(Play)(system)
- val defaultTraceName: String = s"${requestHeader.method}: ${requestHeader.uri}"
-
- val token = if (playExtension.includeTraceToken) {
- requestHeader.headers.toSimpleMap.find(_._1 == playExtension.traceTokenHeaderName).map(_._2)
- } else None
-
- TraceRecorder.start(defaultTraceName, token)(system)
- }
}
diff --git a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
index 8cf2d8e1..710c6ed5 100644
--- a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
+++ b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
@@ -16,15 +16,16 @@
package kamon.play
-import play.api.test._
-import play.api.test.Helpers._
+import kamon.play.action.TraceName
+import kamon.trace.{ TraceLocal, TraceRecorder }
import org.scalatestplus.play._
-import play.api.mvc.{ Results, Action }
+import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.mvc.Results.Ok
-import scala.Some
+import play.api.mvc._
+import play.api.test.Helpers._
+import play.api.test._
+
import scala.concurrent.Future
-import kamon.play.action.TraceName
-import kamon.trace.TraceRecorder
class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite {
@@ -32,7 +33,8 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite {
val executor = scala.concurrent.ExecutionContext.Implicits.global
- implicit override lazy val app = FakeApplication(withRoutes = {
+ implicit override lazy val app = FakeApplication(withGlobal = Some(MockGlobalTest), withRoutes = {
+
case ("GET", "/async") ⇒
Action.async {
Future {
@@ -59,16 +61,23 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite {
}(executor)
}
}
+ case ("GET", "/retrieve") ⇒
+ Action {
+ Ok("retrieve from TraceLocal")
+ }
})
private val traceTokenValue = "kamon-trace-token-test"
private val traceTokenHeaderName = "X-Trace-Token"
private val expectedToken = Some(traceTokenValue)
private val traceTokenHeader = traceTokenHeaderName -> traceTokenValue
+ private val traceLocalStorageValue = "localStorageValue"
+ private val traceLocalStorageKey = "localStorageKey"
+ private val traceLocalStorageHeader = traceLocalStorageKey -> traceLocalStorageValue
"the Request instrumentation" should {
"respond to the Async Action with X-Trace-Token" in {
- val Some(result) = route(FakeRequest(GET, "/async").withHeaders(traceTokenHeader))
+ val Some(result) = route(FakeRequest(GET, "/async").withHeaders(traceTokenHeader, traceLocalStorageHeader))
header(traceTokenHeaderName, result) must be(expectedToken)
}
@@ -90,9 +99,34 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite {
"respond to the Async Action with X-Trace-Token and the renamed trace" in {
val Some(result) = route(FakeRequest(GET, "/async-renamed").withHeaders(traceTokenHeader))
- Thread.sleep(3000)
+ Thread.sleep(500) // wait to complete the future
TraceRecorder.currentContext.map(_.name) must be(Some("renamed-trace"))
header(traceTokenHeaderName, result) must be(expectedToken)
}
+
+ "propagate the TraceContext and LocalStorage through of filters in the current request" in {
+ val Some(result) = route(FakeRequest(GET, "/retrieve").withHeaders(traceTokenHeader, traceLocalStorageHeader))
+ TraceLocal.retrieve(TraceLocalKey).get must be(traceLocalStorageValue)
+ }
}
-} \ No newline at end of file
+
+ object MockGlobalTest extends WithFilters(TraceLocalFilter)
+
+ object TraceLocalKey extends TraceLocal.TraceLocalKey {
+ type ValueType = String
+ }
+
+ object TraceLocalFilter extends Filter {
+ override def apply(next: (RequestHeader) ⇒ Future[Result])(header: RequestHeader): Future[Result] = {
+ TraceRecorder.withTraceContext(TraceRecorder.currentContext) {
+
+ TraceLocal.store(TraceLocalKey)(header.headers.get(traceLocalStorageKey).getOrElse("unknown"))
+
+ next(header).map {
+ result ⇒ result.withHeaders((traceLocalStorageKey -> TraceLocal.retrieve(TraceLocalKey).get))
+ }
+ }
+ }
+ }
+}
+