From e148933747e8fde17b6ac324df0dee70b8cb9ebc Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Thu, 10 Oct 2013 18:52:09 -0300 Subject: complete spray client instrumentation with experimental branch --- kamon-core/src/main/resources/META-INF/aop.xml | 1 + kamon-core/src/main/resources/logback.xml | 12 ++++ kamon-core/src/main/scala/kamon/TraceContext.scala | 2 +- .../SprayServerInstrumentation.scala | 82 +++++++++++++++++----- .../scala/kamon/newrelic/NewRelicReporting.scala | 23 ++++-- .../src/main/scala/kamon/trace/UowTracing.scala | 15 +++- .../main/scala/test/SimpleRequestProcessor.scala | 14 +++- kamon-core/src/test/resources/logback.xml | 12 ---- kamon-core/src/test/resources/newrelic.yml | 2 +- project/Build.scala | 2 +- project/Dependencies.scala | 4 +- project/NewRelic.scala | 2 +- 12 files changed, 125 insertions(+), 46 deletions(-) create mode 100644 kamon-core/src/main/resources/logback.xml delete mode 100644 kamon-core/src/test/resources/logback.xml diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 349fc56d..f13effb9 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -26,6 +26,7 @@ + diff --git a/kamon-core/src/main/resources/logback.xml b/kamon-core/src/main/resources/logback.xml new file mode 100644 index 00000000..2ae1e3bd --- /dev/null +++ b/kamon-core/src/main/resources/logback.xml @@ -0,0 +1,12 @@ + + + + %date{HH:mm:ss.SSS} %-5level [%X{uow}][%X{requestId}] [%thread] %logger{55} - %msg%n + + + + + + + + diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala index 155b7760..63cdb488 100644 --- a/kamon-core/src/main/scala/kamon/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -17,7 +17,7 @@ object TraceContext { def apply()(implicit system: ActorSystem) = { val n = traceIdCounter.incrementAndGet() - val actor = system.actorOf(UowTraceAggregator.props(reporter, 5 seconds), s"tracer-${n}") + val actor = system.actorOf(UowTraceAggregator.props(reporter, 30 seconds), s"tracer-${n}") actor ! Start() new TraceContext(n, actor) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala index 5117e7e7..2bd8643c 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -1,11 +1,14 @@ package kamon.instrumentation -import org.aspectj.lang.annotation.{DeclareMixin, After, Pointcut, Aspect} +import org.aspectj.lang.annotation._ import kamon.{TraceContext, Tracer} -import kamon.trace.UowTracing.{Finish, Rename} -import spray.http.HttpRequest -import spray.can.server.{OpenRequest, OpenRequestComponent} +import kamon.trace.UowTracing._ import kamon.trace.context.TracingAwareContext +import org.aspectj.lang.ProceedingJoinPoint +import spray.http.HttpRequest +import kamon.trace.UowTracing.Finish +import kamon.trace.UowTracing.Rename +import spray.http.HttpHeaders.Host //import spray.can.client.HttpHostConnector.RequestContext @@ -13,35 +16,45 @@ trait ContextAware { def traceContext: Option[TraceContext] } +trait TimedContextAware { + def timestamp: Long + def traceContext: Option[TraceContext] +} + @Aspect class SprayOpenRequestContextTracing { @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") def mixinContextAwareToOpenRequest: ContextAware = new ContextAware { val traceContext: Option[TraceContext] = Tracer.traceContext.value } + + @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") + def mixinContextAwareToRequestContext: TimedContextAware = new TimedContextAware { + val timestamp: Long = System.nanoTime() + val traceContext: Option[TraceContext] = Tracer.traceContext.value + } } @Aspect class SprayServerInstrumentation { - @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(enclosing, request, closeAfterResponseCompletion, timestamp)") - def openRequestInit(openRequest: OpenRequest, enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {} + @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)") + def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {} - @After("openRequestInit(openRequest, enclosing, request, closeAfterResponseCompletion, timestamp)") - def afterInit(openRequest: OpenRequest, enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = { + @After("openRequestInit(openRequest, request)") + def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = { Tracer.start - val discard = openRequest.asInstanceOf[ContextAware].traceContext + openRequest.traceContext Tracer.context().map(_.tracer ! Rename(request.uri.path.toString())) } @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)") - def openRequestCreation(openRequest: OpenRequest): Unit = {} + def openRequestCreation(openRequest: ContextAware): Unit = {} @After("openRequestCreation(openRequest)") - def afterFinishingRequest(openRequest: OpenRequest): Unit = { - val original = openRequest.asInstanceOf[ContextAware].traceContext - + def afterFinishingRequest(openRequest: ContextAware): Unit = { + val original = openRequest.traceContext Tracer.context().map(_.tracer ! Finish()) if(Tracer.context() != original) { @@ -49,13 +62,46 @@ class SprayServerInstrumentation { } } - @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx)") - def requestRecordInit(ctx: TracingAwareContext): Unit = {} + @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *)") + def requestRecordInit(ctx: TimedContextAware, request: HttpRequest): Unit = {} - @After("requestRecordInit(ctx)") - def whenCreatedRequestRecord(ctx: TracingAwareContext): Unit = { + @After("requestRecordInit(ctx, request)") + def whenCreatedRequestRecord(ctx: TimedContextAware, request: HttpRequest): Unit = { // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. - ctx.traceContext + for{ + tctx <- ctx.traceContext + host <- request.header[Host] + } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host) + } + + + + @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)") + def dispatchToCommander(requestContext: TimedContextAware, message: Any): Unit = {} + + @Around("dispatchToCommander(requestContext, message)") + def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TimedContextAware, message: Any) = { + println("Completing the request with context: " + requestContext.traceContext) + + Tracer.traceContext.withValue(requestContext.traceContext) { + requestContext.traceContext.map { + tctx => tctx.tracer ! WebExternalFinish(requestContext.timestamp) + } + pjp.proceed() + } + + } + + + @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)") + def copyingRequestContext(old: TimedContextAware): Unit = {} + + @Around("copyingRequestContext(old)") + def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TimedContextAware) = { + println("Instrumenting the request context copy.") + Tracer.traceContext.withValue(old.traceContext) { + pjp.proceed() + } } } diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala index 31e50cfe..6629e164 100644 --- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala +++ b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala @@ -2,8 +2,9 @@ package kamon.newrelic import akka.actor.Actor import kamon.trace.UowTrace -import com.newrelic.api.agent.{Trace, NewRelic} -import kamon.trace.UowTracing.WebExternal +import com.newrelic.api.agent.{Response, Request, Trace, NewRelic} +import kamon.trace.UowTracing.{WebExternal, WebExternalFinish, WebExternalStart} +import java.util class NewRelicReporting extends Actor { @@ -11,7 +12,6 @@ class NewRelicReporting extends Actor { case trace: UowTrace => recordTransaction(trace) } - //@Trace def recordTransaction(uowTrace: UowTrace): Unit = { val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9) @@ -20,16 +20,25 @@ class NewRelicReporting extends Actor { NewRelic.recordMetric("HttpDispatcher", time.toFloat) uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace => - val external = ((webExternalTrace.end - webExternalTrace.start)/1E9).toFloat - NewRelic.recordMetric(s"External/all", external) - NewRelic.recordMetric(s"External/allWeb", external) + val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat NewRelic.recordMetric(s"External/${webExternalTrace.host}/http", external) NewRelic.recordMetric(s"External/${webExternalTrace.host}/all", external) - NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/" + "WebTransaction/Custom" + uowTrace.name, external) + NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external) + } +/* + + val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp) + + + def measureExternal(segments: Seq[WebExternal]): Long = { } + + NewRelic.recordMetric(s"External/all", external) + NewRelic.recordMetric(s"External/allWeb", external)*/ + } } diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala index c794656d..9ba3813a 100644 --- a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala +++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala @@ -2,8 +2,13 @@ package kamon.trace import akka.actor._ import scala.concurrent.duration.Duration +import kamon.trace.UowTracing._ +import scala.Some +import kamon.trace.UowTracing.WebExternalFinish import kamon.trace.UowTracing.Finish import kamon.trace.UowTracing.Rename +import kamon.trace.UowTrace +import kamon.trace.UowTracing.WebExternalStart import scala.Some sealed trait UowSegment { @@ -18,7 +23,9 @@ object UowTracing { case class Start() extends AutoTimestamp case class Finish() extends AutoTimestamp case class Rename(name: String) extends AutoTimestamp - case class WebExternal(start: Long, end: Long, host: String) extends AutoTimestamp + case class WebExternalStart(id: Long, host: String) extends AutoTimestamp + case class WebExternalFinish(id: Long) extends AutoTimestamp + case class WebExternal(start: Long, finish: Long, host: String) extends AutoTimestamp } case class UowTrace(name: String, segments: Seq[UowSegment]) @@ -30,8 +37,14 @@ class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) exte var name: Option[String] = None var segments: Seq[UowSegment] = Nil + var pendingExternal = List[WebExternalStart]() + def receive = { case finish: Finish => segments = segments :+ finish; finishTracing() + case wes: WebExternalStart => pendingExternal = pendingExternal :+ wes + case finish @ WebExternalFinish(id) => pendingExternal.find(_.id == id).map(start => { + segments = segments :+ WebExternal(start.timestamp, finish.timestamp, start.host) + }) case Rename(newName) => name = Some(newName) case segment: UowSegment => segments = segments :+ segment case ReceiveTimeout => diff --git a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala index 7d4cec52..ef657f24 100644 --- a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala @@ -24,8 +24,12 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil startServer(interface = "localhost", port = 9090) { get { path("test"){ - complete { - pipeline(Get("http://www.despegar.com.ar")).map(r => "Ok") + uow { + complete { + val futures = pipeline(Get("http://10.254.10.57:8000/")).map(r => "Ok") :: pipeline(Get("http://10.254.10.57:8000/")).map(r => "Ok") :: Nil + + Future.sequence(futures).map(l => "Ok") + } } } ~ path("reply" / Segment) { reqID => @@ -45,6 +49,12 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil dynamic { complete(Future { "OK" }) } + } ~ + path("error") { + complete { + throw new NullPointerException + "okk" + } } } } diff --git a/kamon-core/src/test/resources/logback.xml b/kamon-core/src/test/resources/logback.xml deleted file mode 100644 index 2ae1e3bd..00000000 --- a/kamon-core/src/test/resources/logback.xml +++ /dev/null @@ -1,12 +0,0 @@ - - - - %date{HH:mm:ss.SSS} %-5level [%X{uow}][%X{requestId}] [%thread] %logger{55} - %msg%n - - - - - - - - diff --git a/kamon-core/src/test/resources/newrelic.yml b/kamon-core/src/test/resources/newrelic.yml index 1b1ad53b..77923e9c 100644 --- a/kamon-core/src/test/resources/newrelic.yml +++ b/kamon-core/src/test/resources/newrelic.yml @@ -54,7 +54,7 @@ common: &default_settings # Log all data to and from New Relic in plain text. # This setting is dynamic, so changes do not require restarting your application. # Default is false. - #audit_mode: true + audit_mode: true # The number of log files to use. # Default is 1. diff --git a/project/Build.scala b/project/Build.scala index 525cf96c..23880c33 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -24,7 +24,7 @@ object Build extends Build { .settings( libraryDependencies ++= compile(akkaActor, akkaAgent, aspectJ, aspectJWeaver, metrics, newrelic, sprayJson) ++ - provided(sprayCan, sprayClient, sprayRouting, logback, akkaSlf4j) ++ + compile(sprayCan, sprayClient, sprayRouting, logback, akkaSlf4j) ++ test(scalatest, akkaTestKit, sprayTestkit, logback, akkaSlf4j)) //.dependsOn(kamonDashboard) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1a49d1d4..b940edf6 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -7,7 +7,7 @@ object Dependencies { "spray nightlies repo" at "http://nightlies.spray.io" ) - val sprayVersion = "1.2-20130801" + val sprayVersion = "1.2-M8" val akkaVersion = "2.2.0" val sprayCan = "io.spray" % "spray-can" % sprayVersion @@ -25,7 +25,7 @@ object Dependencies { val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2" val aspectJWeaver = "org.aspectj" % "aspectjweaver" % "1.7.2" val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.0" - val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.19.0" + val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "3.0.1" def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile") diff --git a/project/NewRelic.scala b/project/NewRelic.scala index e662d4e3..2fc590cd 100644 --- a/project/NewRelic.scala +++ b/project/NewRelic.scala @@ -8,6 +8,6 @@ object NewRelic { lazy val newrelicSettings = SbtNewrelic.newrelicSettings ++ Seq( javaOptions in run <++= jvmOptions in newrelic, - newrelicVersion in newrelic := "2.20.0" + newrelicVersion in newrelic := "3.0.1" ) } -- cgit v1.2.3