aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2013-11-29 09:59:37 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2013-11-29 09:59:37 -0300
commit231b5b0fc61438ec96e309436295e86808abdbb5 (patch)
tree10288215da3562b5d523b4d2d97b703056cfe4fa
parentb80e4bd135ed50b01def5a4c2f7c42983efb54fd (diff)
downloadKamon-231b5b0fc61438ec96e309436295e86808abdbb5.tar.gz
Kamon-231b5b0fc61438ec96e309436295e86808abdbb5.tar.bz2
Kamon-231b5b0fc61438ec96e309436295e86808abdbb5.zip
instrumentation is back in place
-rw-r--r--kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala19
-rw-r--r--kamon-spray/src/main/resources/META-INF/aop.xml1
-rw-r--r--kamon-spray/src/main/scala/spray/can/client/ClientRequestTracing.scala91
-rw-r--r--kamon-trace/src/main/scala/kamon/trace/Segments.scala11
-rw-r--r--kamon-trace/src/main/scala/kamon/trace/Trace.scala9
5 files changed, 80 insertions, 51 deletions
diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
index 1c1dba4f..53cd28ff 100644
--- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
+++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
@@ -52,6 +52,12 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
Future.sequence(futures).map(l ⇒ "Ok")
}
}
+ } ~ {
+ path("site") {
+ complete {
+ pipeline(Get("http://localhost:4000/"))
+ }
+ }
} ~
path("reply" / Segment) { reqID ⇒
uow {
@@ -116,3 +122,16 @@ class Replier extends Actor with ActorLogging {
sender ! anything
}
}
+
+object PingPong extends App {
+ val system = ActorSystem()
+ val pinger = system.actorOf(Props(new Actor {
+ def receive: Actor.Receive = { case "pong" => sender ! "ping" }
+ }))
+ val ponger = system.actorOf(Props(new Actor {
+ def receive: Actor.Receive = { case "ping" => sender ! "pong" }
+ }))
+
+ pinger.tell("pong", ponger)
+
+}
diff --git a/kamon-spray/src/main/resources/META-INF/aop.xml b/kamon-spray/src/main/resources/META-INF/aop.xml
index afbbb8c0..8324617b 100644
--- a/kamon-spray/src/main/resources/META-INF/aop.xml
+++ b/kamon-spray/src/main/resources/META-INF/aop.xml
@@ -5,6 +5,7 @@
<aspects>
<aspect name="spray.can.server.ServerRequestTracing"/>
+ <aspect name="spray.can.client.ClientRequestTracing"/>
<include within="spray..*"/>
</aspects>
</aspectj>
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestTracing.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestTracing.scala
index e81dc0a4..ab6f1f9c 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestTracing.scala
+++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestTracing.scala
@@ -18,79 +18,68 @@ package spray.can.client
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
-import spray.http.HttpRequest
+import spray.http.{HttpMessageEnd, HttpRequest}
import spray.http.HttpHeaders.Host
-import kamon.trace._
-import spray.http.HttpRequest
-import kamon.trace.TraceContext
-import kamon.trace.Segments.{HttpClientRequest, Start}
+import kamon.trace.{TraceContext, Trace, Segments}
+import kamon.trace.Segments.{ContextAndSegmentCompletionAware, HttpClientRequest}
import kamon.trace.Trace.SegmentCompletionHandle
-trait SegmentCompletionHandleAware {
- var completionHandle: Option[SegmentCompletionHandle]
-}
-
-trait ContextAndSegmentCompletionAware extends ContextAware with SegmentCompletionHandleAware
-
@Aspect
-class SprayOpenRequestContextTracing {
+class ClientRequestTracing {
@DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
- def mixinContextAwareToRequestContext: ContextAndSegmentCompletionAware = new ContextAndSegmentCompletionAware {
- def traceContext: Option[TraceContext] = Trace.context()
+ def mixin: ContextAndSegmentCompletionAware = new ContextAndSegmentCompletionAware {
+ val traceContext: Option[TraceContext] = Trace.context()
var completionHandle: Option[SegmentCompletionHandle] = None
}
-}
-@Aspect
-class SprayServerInstrumentation {
@Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)")
- def requestRecordInit(ctx: ContextAndSegmentCompletionAware, request: HttpRequest): Unit = {}
+ def requestContextCreation(ctx: ContextAndSegmentCompletionAware, request: HttpRequest): Unit = {}
+
+ @After("requestContextCreation(ctx, request)")
+ def afterRequestContextCreation(ctx: ContextAndSegmentCompletionAware, request: HttpRequest): Unit = {
+ // The read to ctx.completionHandle should take care of initializing the aspect timely.
+ if(ctx.completionHandle.isEmpty) {
+ val requestAttributes = Map[String, String](
+ "host" -> request.header[Host].map(_.value).getOrElse("unknown"),
+ "path" -> request.uri.path.toString(),
+ "method" -> request.method.toString()
+ )
+ val completionHandle = Trace.startSegment(category = HttpClientRequest, attributes = requestAttributes)
+ ctx.completionHandle = Some(completionHandle)
+ }
+ }
- @After("requestRecordInit(ctx, request)")
- def whenCreatedRequestRecord(ctx: ContextAndSegmentCompletionAware, request: HttpRequest): Unit = {
- val completionHandle = Trace.startSegment(Segments.Start(HttpClientRequest))
- // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation.
- ctx.traceContext
- ctx.completionHandle = Some(completionHandle)
+ @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)")
+ def copyingRequestContext(old: ContextAndSegmentCompletionAware): Unit = {}
+
+ @Around("copyingRequestContext(old)")
+ def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: ContextAndSegmentCompletionAware) = {
+ Trace.withContext(old.traceContext) {
+ pjp.proceed()
+ }
}
+
@Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)")
- def dispatchToCommander(requestContext: TimedContextAware, message: Any): Unit = {}
+ def dispatchToCommander(requestContext: ContextAndSegmentCompletionAware, message: Any): Unit = {}
@Around("dispatchToCommander(requestContext, message)")
def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: ContextAndSegmentCompletionAware, message: Any) = {
- println("Completing the request with context: " + requestContext.traceContext)
+ requestContext.traceContext match {
+ case ctx @ Some(_) =>
+ Trace.withContext(ctx) {
+ if(message.isInstanceOf[HttpMessageEnd])
+ requestContext.completionHandle.map(_.complete(Segments.End()))
- requestContext.completionHandle.map(_.complete(Segments.End()))
- /*Tracer.context.withValue(requestContext.traceContext) {
- requestContext.traceContext.map {
- tctx => //tctx.tracer ! WebExternalFinish(requestContext.timestamp)
- }
- pjp.proceed()
- }*/
+ pjp.proceed()
+ }
+ case None => pjp.proceed()
+ }
}
- @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)")
- def copyingRequestContext(old: TimedContextAware): Unit = {}
-
- @Around("copyingRequestContext(old)")
- def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TimedContextAware) = {
- println("Instrumenting the request context copy.")
- /*Tracer.traceContext.withValue(old.traceContext) {
- pjp.proceed()
- }*/
- }
-}
-@Aspect
-class SprayRequestContextTracing {
-
- @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
- def mixin: ContextAware = new ContextAware {
- val traceContext: Option[TraceContext] = Trace.context()
- }
} \ No newline at end of file
diff --git a/kamon-trace/src/main/scala/kamon/trace/Segments.scala b/kamon-trace/src/main/scala/kamon/trace/Segments.scala
index 0e1778bf..7cc20181 100644
--- a/kamon-trace/src/main/scala/kamon/trace/Segments.scala
+++ b/kamon-trace/src/main/scala/kamon/trace/Segments.scala
@@ -16,6 +16,8 @@
package kamon.trace
+import kamon.trace.Trace.SegmentCompletionHandle
+
object Segments {
trait Category
@@ -27,4 +29,13 @@ object Segments {
case class End(attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime())
case class Segment(start: Start, end: End)
+
+
+
+
+ trait SegmentCompletionHandleAware {
+ var completionHandle: Option[SegmentCompletionHandle]
+ }
+
+ trait ContextAndSegmentCompletionAware extends ContextAware with SegmentCompletionHandleAware
}
diff --git a/kamon-trace/src/main/scala/kamon/trace/Trace.scala b/kamon-trace/src/main/scala/kamon/trace/Trace.scala
index 02947dd6..6eff69e5 100644
--- a/kamon-trace/src/main/scala/kamon/trace/Trace.scala
+++ b/kamon-trace/src/main/scala/kamon/trace/Trace.scala
@@ -61,9 +61,18 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
// TODO: FIX
def newTraceContext(name: String)(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace), tranid.getAndIncrement, name)
+ def startSegment(category: Segments.Category, description: String = "", attributes: Map[String, String] = Map()): SegmentCompletionHandle = {
+ val start = Segments.Start(category, description, attributes)
+ SegmentCompletionHandle(start)
+ }
+
def startSegment(start: Segments.Start): SegmentCompletionHandle = SegmentCompletionHandle(start)
case class SegmentCompletionHandle(start: Segments.Start) {
+ def complete(): Unit = {
+ val end = Segments.End()
+ println(s"Completing the Segment: $start - $end")
+ }
def complete(end: Segments.End): Unit = {
println(s"Completing the Segment: $start - $end")
}