aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-10-10 18:52:09 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-10-10 18:52:09 -0300
commite148933747e8fde17b6ac324df0dee70b8cb9ebc (patch)
tree6cd084809e8184c74052bcb4201adb3f8e5e4004 /kamon-core
parentdbac9011999384a57c0ecc2c9ab5c49870cdce45 (diff)
downloadKamon-e148933747e8fde17b6ac324df0dee70b8cb9ebc.tar.gz
Kamon-e148933747e8fde17b6ac324df0dee70b8cb9ebc.tar.bz2
Kamon-e148933747e8fde17b6ac324df0dee70b8cb9ebc.zip
complete spray client instrumentation with experimental branch
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/resources/META-INF/aop.xml1
-rw-r--r--kamon-core/src/main/resources/logback.xml (renamed from kamon-core/src/test/resources/logback.xml)0
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContext.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala82
-rw-r--r--kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala23
-rw-r--r--kamon-core/src/main/scala/kamon/trace/UowTracing.scala15
-rw-r--r--kamon-core/src/main/scala/test/SimpleRequestProcessor.scala14
-rw-r--r--kamon-core/src/test/resources/newrelic.yml2
8 files changed, 109 insertions, 30 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml
index 349fc56d..f13effb9 100644
--- a/kamon-core/src/main/resources/META-INF/aop.xml
+++ b/kamon-core/src/main/resources/META-INF/aop.xml
@@ -26,6 +26,7 @@
<include within="*"/>
<exclude within="javax..*"/>
+ <exclude within="com.newrelic..*"/>
<exclude within="org.aspectj..*"/>
<exclude within="scala..*"/>
<exclude within="scalaz..*"/>
diff --git a/kamon-core/src/test/resources/logback.xml b/kamon-core/src/main/resources/logback.xml
index 2ae1e3bd..2ae1e3bd 100644
--- a/kamon-core/src/test/resources/logback.xml
+++ b/kamon-core/src/main/resources/logback.xml
diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala
index 155b7760..63cdb488 100644
--- a/kamon-core/src/main/scala/kamon/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/TraceContext.scala
@@ -17,7 +17,7 @@ object TraceContext {
def apply()(implicit system: ActorSystem) = {
val n = traceIdCounter.incrementAndGet()
- val actor = system.actorOf(UowTraceAggregator.props(reporter, 5 seconds), s"tracer-${n}")
+ val actor = system.actorOf(UowTraceAggregator.props(reporter, 30 seconds), s"tracer-${n}")
actor ! Start()
new TraceContext(n, actor) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
index 5117e7e7..2bd8643c 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
@@ -1,11 +1,14 @@
package kamon.instrumentation
-import org.aspectj.lang.annotation.{DeclareMixin, After, Pointcut, Aspect}
+import org.aspectj.lang.annotation._
import kamon.{TraceContext, Tracer}
-import kamon.trace.UowTracing.{Finish, Rename}
-import spray.http.HttpRequest
-import spray.can.server.{OpenRequest, OpenRequestComponent}
+import kamon.trace.UowTracing._
import kamon.trace.context.TracingAwareContext
+import org.aspectj.lang.ProceedingJoinPoint
+import spray.http.HttpRequest
+import kamon.trace.UowTracing.Finish
+import kamon.trace.UowTracing.Rename
+import spray.http.HttpHeaders.Host
//import spray.can.client.HttpHostConnector.RequestContext
@@ -13,35 +16,45 @@ trait ContextAware {
def traceContext: Option[TraceContext]
}
+trait TimedContextAware {
+ def timestamp: Long
+ def traceContext: Option[TraceContext]
+}
+
@Aspect
class SprayOpenRequestContextTracing {
@DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest")
def mixinContextAwareToOpenRequest: ContextAware = new ContextAware {
val traceContext: Option[TraceContext] = Tracer.traceContext.value
}
+
+ @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
+ def mixinContextAwareToRequestContext: TimedContextAware = new TimedContextAware {
+ val timestamp: Long = System.nanoTime()
+ val traceContext: Option[TraceContext] = Tracer.traceContext.value
+ }
}
@Aspect
class SprayServerInstrumentation {
- @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(enclosing, request, closeAfterResponseCompletion, timestamp)")
- def openRequestInit(openRequest: OpenRequest, enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {}
+ @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)")
+ def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {}
- @After("openRequestInit(openRequest, enclosing, request, closeAfterResponseCompletion, timestamp)")
- def afterInit(openRequest: OpenRequest, enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {
+ @After("openRequestInit(openRequest, request)")
+ def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = {
Tracer.start
- val discard = openRequest.asInstanceOf[ContextAware].traceContext
+ openRequest.traceContext
Tracer.context().map(_.tracer ! Rename(request.uri.path.toString()))
}
@Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)")
- def openRequestCreation(openRequest: OpenRequest): Unit = {}
+ def openRequestCreation(openRequest: ContextAware): Unit = {}
@After("openRequestCreation(openRequest)")
- def afterFinishingRequest(openRequest: OpenRequest): Unit = {
- val original = openRequest.asInstanceOf[ContextAware].traceContext
-
+ def afterFinishingRequest(openRequest: ContextAware): Unit = {
+ val original = openRequest.traceContext
Tracer.context().map(_.tracer ! Finish())
if(Tracer.context() != original) {
@@ -49,13 +62,46 @@ class SprayServerInstrumentation {
}
}
- @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx)")
- def requestRecordInit(ctx: TracingAwareContext): Unit = {}
+ @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *)")
+ def requestRecordInit(ctx: TimedContextAware, request: HttpRequest): Unit = {}
- @After("requestRecordInit(ctx)")
- def whenCreatedRequestRecord(ctx: TracingAwareContext): Unit = {
+ @After("requestRecordInit(ctx, request)")
+ def whenCreatedRequestRecord(ctx: TimedContextAware, request: HttpRequest): Unit = {
// Necessary to force the initialization of TracingAwareRequestContext at the moment of creation.
- ctx.traceContext
+ for{
+ tctx <- ctx.traceContext
+ host <- request.header[Host]
+ } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host)
+ }
+
+
+
+ @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)")
+ def dispatchToCommander(requestContext: TimedContextAware, message: Any): Unit = {}
+
+ @Around("dispatchToCommander(requestContext, message)")
+ def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TimedContextAware, message: Any) = {
+ println("Completing the request with context: " + requestContext.traceContext)
+
+ Tracer.traceContext.withValue(requestContext.traceContext) {
+ requestContext.traceContext.map {
+ tctx => tctx.tracer ! WebExternalFinish(requestContext.timestamp)
+ }
+ pjp.proceed()
+ }
+
+ }
+
+
+ @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)")
+ def copyingRequestContext(old: TimedContextAware): Unit = {}
+
+ @Around("copyingRequestContext(old)")
+ def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TimedContextAware) = {
+ println("Instrumenting the request context copy.")
+ Tracer.traceContext.withValue(old.traceContext) {
+ pjp.proceed()
+ }
}
}
diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
index 31e50cfe..6629e164 100644
--- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
+++ b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
@@ -2,8 +2,9 @@ package kamon.newrelic
import akka.actor.Actor
import kamon.trace.UowTrace
-import com.newrelic.api.agent.{Trace, NewRelic}
-import kamon.trace.UowTracing.WebExternal
+import com.newrelic.api.agent.{Response, Request, Trace, NewRelic}
+import kamon.trace.UowTracing.{WebExternal, WebExternalFinish, WebExternalStart}
+import java.util
class NewRelicReporting extends Actor {
@@ -11,7 +12,6 @@ class NewRelicReporting extends Actor {
case trace: UowTrace => recordTransaction(trace)
}
- //@Trace
def recordTransaction(uowTrace: UowTrace): Unit = {
val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9)
@@ -20,16 +20,25 @@ class NewRelicReporting extends Actor {
NewRelic.recordMetric("HttpDispatcher", time.toFloat)
uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace =>
- val external = ((webExternalTrace.end - webExternalTrace.start)/1E9).toFloat
- NewRelic.recordMetric(s"External/all", external)
- NewRelic.recordMetric(s"External/allWeb", external)
+ val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat
NewRelic.recordMetric(s"External/${webExternalTrace.host}/http", external)
NewRelic.recordMetric(s"External/${webExternalTrace.host}/all", external)
- NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/" + "WebTransaction/Custom" + uowTrace.name, external)
+ NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external)
+ }
+/*
+
+ val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp)
+
+
+ def measureExternal(segments: Seq[WebExternal]): Long = {
}
+
+ NewRelic.recordMetric(s"External/all", external)
+ NewRelic.recordMetric(s"External/allWeb", external)*/
+
}
}
diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
index c794656d..9ba3813a 100644
--- a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
+++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
@@ -2,8 +2,13 @@ package kamon.trace
import akka.actor._
import scala.concurrent.duration.Duration
+import kamon.trace.UowTracing._
+import scala.Some
+import kamon.trace.UowTracing.WebExternalFinish
import kamon.trace.UowTracing.Finish
import kamon.trace.UowTracing.Rename
+import kamon.trace.UowTrace
+import kamon.trace.UowTracing.WebExternalStart
import scala.Some
sealed trait UowSegment {
@@ -18,7 +23,9 @@ object UowTracing {
case class Start() extends AutoTimestamp
case class Finish() extends AutoTimestamp
case class Rename(name: String) extends AutoTimestamp
- case class WebExternal(start: Long, end: Long, host: String) extends AutoTimestamp
+ case class WebExternalStart(id: Long, host: String) extends AutoTimestamp
+ case class WebExternalFinish(id: Long) extends AutoTimestamp
+ case class WebExternal(start: Long, finish: Long, host: String) extends AutoTimestamp
}
case class UowTrace(name: String, segments: Seq[UowSegment])
@@ -30,8 +37,14 @@ class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) exte
var name: Option[String] = None
var segments: Seq[UowSegment] = Nil
+ var pendingExternal = List[WebExternalStart]()
+
def receive = {
case finish: Finish => segments = segments :+ finish; finishTracing()
+ case wes: WebExternalStart => pendingExternal = pendingExternal :+ wes
+ case finish @ WebExternalFinish(id) => pendingExternal.find(_.id == id).map(start => {
+ segments = segments :+ WebExternal(start.timestamp, finish.timestamp, start.host)
+ })
case Rename(newName) => name = Some(newName)
case segment: UowSegment => segments = segments :+ segment
case ReceiveTimeout =>
diff --git a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala
index 7d4cec52..ef657f24 100644
--- a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala
+++ b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala
@@ -24,8 +24,12 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
startServer(interface = "localhost", port = 9090) {
get {
path("test"){
- complete {
- pipeline(Get("http://www.despegar.com.ar")).map(r => "Ok")
+ uow {
+ complete {
+ val futures = pipeline(Get("http://10.254.10.57:8000/")).map(r => "Ok") :: pipeline(Get("http://10.254.10.57:8000/")).map(r => "Ok") :: Nil
+
+ Future.sequence(futures).map(l => "Ok")
+ }
}
} ~
path("reply" / Segment) { reqID =>
@@ -45,6 +49,12 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
dynamic {
complete(Future { "OK" })
}
+ } ~
+ path("error") {
+ complete {
+ throw new NullPointerException
+ "okk"
+ }
}
}
}
diff --git a/kamon-core/src/test/resources/newrelic.yml b/kamon-core/src/test/resources/newrelic.yml
index 1b1ad53b..77923e9c 100644
--- a/kamon-core/src/test/resources/newrelic.yml
+++ b/kamon-core/src/test/resources/newrelic.yml
@@ -54,7 +54,7 @@ common: &default_settings
# Log all data to and from New Relic in plain text.
# This setting is dynamic, so changes do not require restarting your application.
# Default is false.
- #audit_mode: true
+ audit_mode: true
# The number of log files to use.
# Default is 1.