aboutsummaryrefslogtreecommitdiff
path: root/kamon-play
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-09-09 00:58:03 -0300
committerDiego <diegolparra@gmail.com>2014-09-09 01:05:42 -0300
commit6884c0f3f7bf9376e9eaf4f330d7622c142399e3 (patch)
tree5953082053452a644b0f3323f4216c1808ee3ccb /kamon-play
parent23785a41dc3a0e9651ba87bc9dc255932ea64bd6 (diff)
downloadKamon-6884c0f3f7bf9376e9eaf4f330d7622c142399e3.tar.gz
Kamon-6884c0f3f7bf9376e9eaf4f330d7622c142399e3.tar.bz2
Kamon-6884c0f3f7bf9376e9eaf4f330d7622c142399e3.zip
= play:
* remove from publishErrorMessage method * refactor onError method in RequestInstrumentation * refactor WSInstrumentation in order to propagate the TraceContext when a WS call is executed outside an Action * improve tests * closes #33
Diffstat (limited to 'kamon-play')
-rw-r--r--kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala23
-rw-r--r--kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala46
-rw-r--r--kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala23
-rw-r--r--kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala92
4 files changed, 102 insertions, 82 deletions
diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
index ebac38d9..8d8de230 100644
--- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
+++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
@@ -20,6 +20,7 @@ import kamon.play.{ Play, PlayExtension }
import kamon.trace.{ TraceContextAware, TraceRecorder }
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation._
+import play.api.mvc.Results._
import play.api.mvc._
import play.libs.Akka
@@ -34,7 +35,7 @@ class RequestInstrumentation {
Kamon(Play)(Akka.system())
}
- @Before("execution(* play.api.GlobalSettings+.onRouteRequest(..)) && args(requestHeader)")
+ @Before("call(* play.api.GlobalSettings+.onRouteRequest(..)) && args(requestHeader)")
def onRouteRequest(requestHeader: RequestHeader): Unit = {
val system = Akka.system()
val playExtension = Kamon(Play)(system)
@@ -57,10 +58,9 @@ class RequestInstrumentation {
next(requestHeader).map {
result ⇒
TraceRecorder.finish()
-
incomingContext.map { ctx ⇒
val playExtension = Kamon(Play)(ctx.system)
- recordHttpServerMetrics(result, ctx.name, playExtension)
+ recordHttpServerMetrics(result.header, ctx.name, playExtension)
if (playExtension.includeTraceToken) result.withHeaders(playExtension.traceTokenHeaderName -> ctx.token)
else result
}.getOrElse(result)
@@ -69,16 +69,11 @@ class RequestInstrumentation {
pjp.proceed(Array(EssentialAction(essentialAction)))
}
- private def recordHttpServerMetrics(result: Result, traceName: String, playExtension: PlayExtension): Unit =
- playExtension.httpServerMetrics.recordResponse(traceName, result.header.status.toString, 1L)
-
- @Around("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)")
- def aroundOnError(pjp: ProceedingJoinPoint, request: TraceContextAware, ex: Throwable): Any = request.traceContext match {
- case None ⇒ pjp.proceed()
- case Some(ctx) ⇒ {
- val actorSystem = ctx.system
- Kamon(Play)(actorSystem).publishErrorMessage(actorSystem, ex.getMessage, ex)
- pjp.proceed()
- }
+ @Before("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)")
+ def beforeOnError(request: TraceContextAware, ex: Throwable): Unit = request.traceContext.map {
+ ctx ⇒ recordHttpServerMetrics(InternalServerError.header, ctx.name, Kamon(Play)(ctx.system))
}
+
+ private def recordHttpServerMetrics(header: ResponseHeader, traceName: String, playExtension: PlayExtension): Unit =
+ playExtension.httpServerMetrics.recordResponse(traceName, header.status.toString)
}
diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
index 2862ba19..3e4d6110 100644
--- a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
+++ b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
@@ -16,15 +16,16 @@
package kamon.play.instrumentation
-import org.aspectj.lang.annotation.{ Around, Pointcut, Aspect }
-import org.aspectj.lang.ProceedingJoinPoint
-import kamon.trace.TraceRecorder
import kamon.metric.TraceMetrics.HttpClientRequest
-import play.api.libs.ws.WSRequest
-import scala.concurrent.Future
-import play.api.libs.ws.WSResponse
-import scala.util.{ Failure, Success }
+import kamon.trace.{ TraceContext, TraceRecorder }
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut }
+import play.api.libs.ws.ning.NingWSRequest
+import play.api.libs.ws.{ WSRequest, WSResponse }
+import play.libs.Akka
+
import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
@Aspect
class WSInstrumentation {
@@ -34,27 +35,36 @@ class WSInstrumentation {
@Around("onExecuteRequest(request)")
def aroundExecuteRequest(pjp: ProceedingJoinPoint, request: WSRequest): Any = {
- import WSInstrumentation._
- val completionHandle = TraceRecorder.startSegment(HttpClientRequest(request.url), basicRequestAttributes(request))
+ import kamon.play.instrumentation.WSInstrumentation._
- val response = pjp.proceed().asInstanceOf[Future[WSResponse]]
+ withOrNewTraceContext(TraceRecorder.currentContext)(request) {
+ val response = pjp.proceed().asInstanceOf[Future[WSResponse]]
+ val segmentHandle = TraceRecorder.startSegment(HttpClientRequest(request.url), basicRequestAttributes(request))
- response.onComplete {
- case Failure(t) ⇒ completionHandle.map(_.finish(Map("completed-with-error" -> t.getMessage)))
- case Success(_) ⇒ completionHandle.map(_.finish(Map.empty))
+ response.map {
+ r ⇒
+ segmentHandle.map(_.finish())
+ TraceRecorder.finish()
+ }
+ response
}
-
- response
}
}
object WSInstrumentation {
+ def uri(request: WSRequest): java.net.URI = request.asInstanceOf[NingWSRequest].builder.build().getURI
+
def basicRequestAttributes(request: WSRequest): Map[String, String] = {
Map[String, String](
- "host" -> request.header("host").getOrElse("Unknown"),
- "path" -> request.method)
+ "host" -> uri(request).getHost,
+ "path" -> uri(request).getPath,
+ "method" -> request.method)
}
-}
+ def withOrNewTraceContext[T](context: Option[TraceContext])(request: WSRequest)(thunk: ⇒ T): T = {
+ if (context.isDefined) TraceRecorder.withTraceContext(context) { thunk }
+ else TraceRecorder.withNewTraceContext(request.url, metadata = basicRequestAttributes(request)) { thunk }(Akka.system())
+ }
+} \ No newline at end of file
diff --git a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
index fc195580..00adb99b 100644
--- a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
+++ b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
@@ -15,13 +15,14 @@
package kamon.play
-import scala.concurrent.duration._
import kamon.Kamon
import kamon.http.HttpServerMetrics
import kamon.metric.{ CollectionContext, Metrics }
import kamon.play.action.TraceName
import kamon.trace.{ TraceLocal, TraceRecorder }
import org.scalatestplus.play._
+import play.api.DefaultGlobal
+import play.api.http.Writeable
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.mvc.Results.Ok
import play.api.mvc._
@@ -29,6 +30,7 @@ import play.api.test.Helpers._
import play.api.test._
import play.libs.Akka
+import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite {
@@ -49,6 +51,11 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite {
Action {
Results.NotFound
}
+ case ("GET", "/error") ⇒
+ Action {
+ throw new Exception("This page generates an error!")
+ Ok("This page will generate an error!")
+ }
case ("GET", "/redirect") ⇒
Action {
Results.Redirect("/redirected", MOVED_PERMANENTLY)
@@ -125,11 +132,17 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite {
Await.result(route(FakeRequest(GET, "/notFound").withHeaders(traceTokenHeader)).get, 10 seconds)
}
+ for (repetition ← 1 to 5) {
+ Await.result(routeWithOnError(FakeRequest(GET, "/error").withHeaders(traceTokenHeader)).get, 10 seconds)
+ }
+
val snapshot = Kamon(Metrics)(Akka.system()).register(HttpServerMetrics, HttpServerMetrics.Factory).get.collect(collectionContext)
snapshot.countsPerTraceAndStatusCode("GET: /default")("200").count must be(10)
snapshot.countsPerTraceAndStatusCode("GET: /notFound")("404").count must be(5)
+ snapshot.countsPerTraceAndStatusCode("GET: /error")("500").count must be(5)
snapshot.countsPerStatusCode("200").count must be(10)
snapshot.countsPerStatusCode("404").count must be(5)
+ snapshot.countsPerStatusCode("500").count must be(5)
}
}
@@ -151,5 +164,13 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite {
}
}
}
+
+ def routeWithOnError[T](req: Request[T])(implicit w: Writeable[T]): Option[Future[Result]] = {
+ route(req).map { result ⇒
+ result.recoverWith {
+ case t: Throwable ⇒ DefaultGlobal.onError(req, t)
+ }
+ }
+ }
}
diff --git a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
index 775d3e26..cbd95db3 100644
--- a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
+++ b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
@@ -16,70 +16,64 @@
package kamon.play
+import kamon.Kamon
+import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot }
+import kamon.metric.{ Metrics, TraceMetrics }
+import org.scalatest.{ Matchers, WordSpecLike }
+import org.scalatestplus.play.OneServerPerSuite
+import play.api.libs.ws.WS
import play.api.mvc.Action
import play.api.mvc.Results.Ok
-import play.api.libs.ws.WS
-import org.scalatestplus.play.OneServerPerSuite
-import play.api.test._
import play.api.test.Helpers._
-import akka.actor.ActorSystem
-import akka.testkit.{ TestKitBase, TestProbe }
+import play.api.test._
+import play.libs.Akka
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{ Matchers, WordSpecLike }
-import kamon.Kamon
-import kamon.metric.{ TraceMetrics, Metrics }
-import kamon.metric.Subscriptions.TickMetricSnapshot
-import kamon.metric.TraceMetrics.ElapsedTime
+import scala.concurrent.Await
+import scala.concurrent.duration._
-class WSInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with OneServerPerSuite {
+class WSInstrumentationSpec extends WordSpecLike with Matchers with OneServerPerSuite {
System.setProperty("config.file", "./kamon-play/src/test/resources/conf/application.conf")
- import scala.collection.immutable.StringLike._
- implicit lazy val system: ActorSystem = ActorSystem("play-ws-instrumentation-spec", ConfigFactory.parseString(
- """
- |akka {
- | loglevel = ERROR
- |}
- |
- |kamon {
- | metrics {
- | tick-interval = 2 seconds
- |
- | filters = [
- | {
- | trace {
- | includes = [ "*" ]
- | excludes = []
- | }
- | }
- | ]
- | }
- |}
- """.stripMargin))
-
implicit override lazy val app = FakeApplication(withRoutes = {
- case ("GET", "/async") ⇒ Action { Ok("ok") }
+ case ("GET", "/async") ⇒ Action { Ok("ok") }
+ case ("GET", "/outside") ⇒ Action { Ok("ok") }
+ case ("GET", "/inside") ⇒ callWSinsideController("http://localhost:19001/async")
})
"the WS instrumentation" should {
- "respond to the Async Action and complete the WS request" in {
+ "propagate the TraceContext outside an Action and complete the WS request" in {
+ Await.result(WS.url("http://localhost:19001/outside").get(), 10 seconds)
+
+ val snapshot = takeSnapshotOf("http://localhost:19001/outside")
+ snapshot.elapsedTime.numberOfMeasurements should be(1)
+ snapshot.segments.size should be(1)
+ snapshot.segments(HttpClientRequest("http://localhost:19001/outside")).numberOfMeasurements should be(1)
+ }
- val metricListener = TestProbe()
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true)
- metricListener.expectMsgType[TickMetricSnapshot]
+ "propagate the TraceContext inside an Action and complete the WS request" in {
+ Await.result(route(FakeRequest(GET, "/inside")).get, 10 seconds)
+
+ val snapshot = takeSnapshotOf("GET: /inside")
+ snapshot.elapsedTime.numberOfMeasurements should be(2)
+ snapshot.segments.size should be(1)
+ snapshot.segments(HttpClientRequest("http://localhost:19001/async")).numberOfMeasurements should be(1)
+ }
+
+ }
+
+ def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = {
+ val recorder = Kamon(Metrics)(Akka.system()).register(TraceMetrics(traceName), TraceMetrics.Factory)
+ val collectionContext = Kamon(Metrics)(Akka.system()).buildDefaultCollectionContext
+ recorder.get.collect(collectionContext)
+ }
- val response = await(WS.url("http://localhost:19001/async").get())
- response.status should be(OK)
+ def callWSinsideController(url: String) = Action.async {
+ import play.api.libs.concurrent.Execution.Implicits.defaultContext
+ import play.api.Play.current
- // val tickSnapshot = metricListener.expectMsgType[TickMetricSnapshot]
- // val traceMetrics = tickSnapshot.metrics.find { case (k, v) ⇒ k.name.contains("async") } map (_._2.metrics)
- // traceMetrics should not be empty
- //
- // traceMetrics map { metrics ⇒
- // metrics(ElapsedTime).numberOfMeasurements should be(1L)
- // }
+ WS.url(url).get().map { response ⇒
+ Ok("Ok")
}
}
} \ No newline at end of file