From 6884c0f3f7bf9376e9eaf4f330d7622c142399e3 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 9 Sep 2014 00:58:03 -0300 Subject: = play: * remove from publishErrorMessage method * refactor onError method in RequestInstrumentation * refactor WSInstrumentation in order to propagate the TraceContext when a WS call is executed outside an Action * improve tests * closes #33 --- .../instrumentation/RequestInstrumentation.scala | 23 +++--- .../play/instrumentation/WSInstrumentation.scala | 46 ++++++----- .../kamon/play/RequestInstrumentationSpec.scala | 23 +++++- .../scala/kamon/play/WSInstrumentationSpec.scala | 92 ++++++++++------------ 4 files changed, 102 insertions(+), 82 deletions(-) (limited to 'kamon-play/src') diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala index ebac38d9..8d8de230 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala @@ -20,6 +20,7 @@ import kamon.play.{ Play, PlayExtension } import kamon.trace.{ TraceContextAware, TraceRecorder } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ +import play.api.mvc.Results._ import play.api.mvc._ import play.libs.Akka @@ -34,7 +35,7 @@ class RequestInstrumentation { Kamon(Play)(Akka.system()) } - @Before("execution(* play.api.GlobalSettings+.onRouteRequest(..)) && args(requestHeader)") + @Before("call(* play.api.GlobalSettings+.onRouteRequest(..)) && args(requestHeader)") def onRouteRequest(requestHeader: RequestHeader): Unit = { val system = Akka.system() val playExtension = Kamon(Play)(system) @@ -57,10 +58,9 @@ class RequestInstrumentation { next(requestHeader).map { result ⇒ TraceRecorder.finish() - incomingContext.map { ctx ⇒ val playExtension = Kamon(Play)(ctx.system) - recordHttpServerMetrics(result, ctx.name, playExtension) + recordHttpServerMetrics(result.header, ctx.name, playExtension) if (playExtension.includeTraceToken) result.withHeaders(playExtension.traceTokenHeaderName -> ctx.token) else result }.getOrElse(result) @@ -69,16 +69,11 @@ class RequestInstrumentation { pjp.proceed(Array(EssentialAction(essentialAction))) } - private def recordHttpServerMetrics(result: Result, traceName: String, playExtension: PlayExtension): Unit = - playExtension.httpServerMetrics.recordResponse(traceName, result.header.status.toString, 1L) - - @Around("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)") - def aroundOnError(pjp: ProceedingJoinPoint, request: TraceContextAware, ex: Throwable): Any = request.traceContext match { - case None ⇒ pjp.proceed() - case Some(ctx) ⇒ { - val actorSystem = ctx.system - Kamon(Play)(actorSystem).publishErrorMessage(actorSystem, ex.getMessage, ex) - pjp.proceed() - } + @Before("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)") + def beforeOnError(request: TraceContextAware, ex: Throwable): Unit = request.traceContext.map { + ctx ⇒ recordHttpServerMetrics(InternalServerError.header, ctx.name, Kamon(Play)(ctx.system)) } + + private def recordHttpServerMetrics(header: ResponseHeader, traceName: String, playExtension: PlayExtension): Unit = + playExtension.httpServerMetrics.recordResponse(traceName, header.status.toString) } diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala index 2862ba19..3e4d6110 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala @@ -16,15 +16,16 @@ package kamon.play.instrumentation -import org.aspectj.lang.annotation.{ Around, Pointcut, Aspect } -import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.TraceRecorder import kamon.metric.TraceMetrics.HttpClientRequest -import play.api.libs.ws.WSRequest -import scala.concurrent.Future -import play.api.libs.ws.WSResponse -import scala.util.{ Failure, Success } +import kamon.trace.{ TraceContext, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } +import play.api.libs.ws.ning.NingWSRequest +import play.api.libs.ws.{ WSRequest, WSResponse } +import play.libs.Akka + import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future @Aspect class WSInstrumentation { @@ -34,27 +35,36 @@ class WSInstrumentation { @Around("onExecuteRequest(request)") def aroundExecuteRequest(pjp: ProceedingJoinPoint, request: WSRequest): Any = { - import WSInstrumentation._ - val completionHandle = TraceRecorder.startSegment(HttpClientRequest(request.url), basicRequestAttributes(request)) + import kamon.play.instrumentation.WSInstrumentation._ - val response = pjp.proceed().asInstanceOf[Future[WSResponse]] + withOrNewTraceContext(TraceRecorder.currentContext)(request) { + val response = pjp.proceed().asInstanceOf[Future[WSResponse]] + val segmentHandle = TraceRecorder.startSegment(HttpClientRequest(request.url), basicRequestAttributes(request)) - response.onComplete { - case Failure(t) ⇒ completionHandle.map(_.finish(Map("completed-with-error" -> t.getMessage))) - case Success(_) ⇒ completionHandle.map(_.finish(Map.empty)) + response.map { + r ⇒ + segmentHandle.map(_.finish()) + TraceRecorder.finish() + } + response } - - response } } object WSInstrumentation { + def uri(request: WSRequest): java.net.URI = request.asInstanceOf[NingWSRequest].builder.build().getURI + def basicRequestAttributes(request: WSRequest): Map[String, String] = { Map[String, String]( - "host" -> request.header("host").getOrElse("Unknown"), - "path" -> request.method) + "host" -> uri(request).getHost, + "path" -> uri(request).getPath, + "method" -> request.method) } -} + def withOrNewTraceContext[T](context: Option[TraceContext])(request: WSRequest)(thunk: ⇒ T): T = { + if (context.isDefined) TraceRecorder.withTraceContext(context) { thunk } + else TraceRecorder.withNewTraceContext(request.url, metadata = basicRequestAttributes(request)) { thunk }(Akka.system()) + } +} \ No newline at end of file diff --git a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala index fc195580..00adb99b 100644 --- a/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala +++ b/kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala @@ -15,13 +15,14 @@ package kamon.play -import scala.concurrent.duration._ import kamon.Kamon import kamon.http.HttpServerMetrics import kamon.metric.{ CollectionContext, Metrics } import kamon.play.action.TraceName import kamon.trace.{ TraceLocal, TraceRecorder } import org.scalatestplus.play._ +import play.api.DefaultGlobal +import play.api.http.Writeable import play.api.libs.concurrent.Execution.Implicits.defaultContext import play.api.mvc.Results.Ok import play.api.mvc._ @@ -29,6 +30,7 @@ import play.api.test.Helpers._ import play.api.test._ import play.libs.Akka +import scala.concurrent.duration._ import scala.concurrent.{ Await, Future } class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite { @@ -49,6 +51,11 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite { Action { Results.NotFound } + case ("GET", "/error") ⇒ + Action { + throw new Exception("This page generates an error!") + Ok("This page will generate an error!") + } case ("GET", "/redirect") ⇒ Action { Results.Redirect("/redirected", MOVED_PERMANENTLY) @@ -125,11 +132,17 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite { Await.result(route(FakeRequest(GET, "/notFound").withHeaders(traceTokenHeader)).get, 10 seconds) } + for (repetition ← 1 to 5) { + Await.result(routeWithOnError(FakeRequest(GET, "/error").withHeaders(traceTokenHeader)).get, 10 seconds) + } + val snapshot = Kamon(Metrics)(Akka.system()).register(HttpServerMetrics, HttpServerMetrics.Factory).get.collect(collectionContext) snapshot.countsPerTraceAndStatusCode("GET: /default")("200").count must be(10) snapshot.countsPerTraceAndStatusCode("GET: /notFound")("404").count must be(5) + snapshot.countsPerTraceAndStatusCode("GET: /error")("500").count must be(5) snapshot.countsPerStatusCode("200").count must be(10) snapshot.countsPerStatusCode("404").count must be(5) + snapshot.countsPerStatusCode("500").count must be(5) } } @@ -151,5 +164,13 @@ class RequestInstrumentationSpec extends PlaySpec with OneServerPerSuite { } } } + + def routeWithOnError[T](req: Request[T])(implicit w: Writeable[T]): Option[Future[Result]] = { + route(req).map { result ⇒ + result.recoverWith { + case t: Throwable ⇒ DefaultGlobal.onError(req, t) + } + } + } } diff --git a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala index 775d3e26..cbd95db3 100644 --- a/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala +++ b/kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala @@ -16,70 +16,64 @@ package kamon.play +import kamon.Kamon +import kamon.metric.TraceMetrics.{ HttpClientRequest, TraceMetricsSnapshot } +import kamon.metric.{ Metrics, TraceMetrics } +import org.scalatest.{ Matchers, WordSpecLike } +import org.scalatestplus.play.OneServerPerSuite +import play.api.libs.ws.WS import play.api.mvc.Action import play.api.mvc.Results.Ok -import play.api.libs.ws.WS -import org.scalatestplus.play.OneServerPerSuite -import play.api.test._ import play.api.test.Helpers._ -import akka.actor.ActorSystem -import akka.testkit.{ TestKitBase, TestProbe } +import play.api.test._ +import play.libs.Akka -import com.typesafe.config.ConfigFactory -import org.scalatest.{ Matchers, WordSpecLike } -import kamon.Kamon -import kamon.metric.{ TraceMetrics, Metrics } -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.TraceMetrics.ElapsedTime +import scala.concurrent.Await +import scala.concurrent.duration._ -class WSInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with OneServerPerSuite { +class WSInstrumentationSpec extends WordSpecLike with Matchers with OneServerPerSuite { System.setProperty("config.file", "./kamon-play/src/test/resources/conf/application.conf") - import scala.collection.immutable.StringLike._ - implicit lazy val system: ActorSystem = ActorSystem("play-ws-instrumentation-spec", ConfigFactory.parseString( - """ - |akka { - | loglevel = ERROR - |} - | - |kamon { - | metrics { - | tick-interval = 2 seconds - | - | filters = [ - | { - | trace { - | includes = [ "*" ] - | excludes = [] - | } - | } - | ] - | } - |} - """.stripMargin)) - implicit override lazy val app = FakeApplication(withRoutes = { - case ("GET", "/async") ⇒ Action { Ok("ok") } + case ("GET", "/async") ⇒ Action { Ok("ok") } + case ("GET", "/outside") ⇒ Action { Ok("ok") } + case ("GET", "/inside") ⇒ callWSinsideController("http://localhost:19001/async") }) "the WS instrumentation" should { - "respond to the Async Action and complete the WS request" in { + "propagate the TraceContext outside an Action and complete the WS request" in { + Await.result(WS.url("http://localhost:19001/outside").get(), 10 seconds) + + val snapshot = takeSnapshotOf("http://localhost:19001/outside") + snapshot.elapsedTime.numberOfMeasurements should be(1) + snapshot.segments.size should be(1) + snapshot.segments(HttpClientRequest("http://localhost:19001/outside")).numberOfMeasurements should be(1) + } - val metricListener = TestProbe() - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", metricListener.ref, permanently = true) - metricListener.expectMsgType[TickMetricSnapshot] + "propagate the TraceContext inside an Action and complete the WS request" in { + Await.result(route(FakeRequest(GET, "/inside")).get, 10 seconds) + + val snapshot = takeSnapshotOf("GET: /inside") + snapshot.elapsedTime.numberOfMeasurements should be(2) + snapshot.segments.size should be(1) + snapshot.segments(HttpClientRequest("http://localhost:19001/async")).numberOfMeasurements should be(1) + } + + } + + def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = { + val recorder = Kamon(Metrics)(Akka.system()).register(TraceMetrics(traceName), TraceMetrics.Factory) + val collectionContext = Kamon(Metrics)(Akka.system()).buildDefaultCollectionContext + recorder.get.collect(collectionContext) + } - val response = await(WS.url("http://localhost:19001/async").get()) - response.status should be(OK) + def callWSinsideController(url: String) = Action.async { + import play.api.libs.concurrent.Execution.Implicits.defaultContext + import play.api.Play.current - // val tickSnapshot = metricListener.expectMsgType[TickMetricSnapshot] - // val traceMetrics = tickSnapshot.metrics.find { case (k, v) ⇒ k.name.contains("async") } map (_._2.metrics) - // traceMetrics should not be empty - // - // traceMetrics map { metrics ⇒ - // metrics(ElapsedTime).numberOfMeasurements should be(1L) - // } + WS.url(url).get().map { response ⇒ + Ok("Ok") } } } \ No newline at end of file -- cgit v1.2.3