aboutsummaryrefslogtreecommitdiff
path: root/kamon-spray
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-11-05 18:38:39 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-11-05 18:38:39 -0300
commit2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1 (patch)
tree56c4ad1f025c9144376cd4463ad4d4a23e37b571 /kamon-spray
parent5127c3bb83cd6fe90e071720d995cfb53d913e6a (diff)
downloadKamon-2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1.tar.gz
Kamon-2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1.tar.bz2
Kamon-2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1.zip
basic separation of concerns between sub-projects
Diffstat (limited to 'kamon-spray')
-rw-r--r--kamon-spray/src/main/resources/META-INF/aop.xml10
-rw-r--r--kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala59
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala27
-rw-r--r--kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala56
-rw-r--r--kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala49
5 files changed, 155 insertions, 46 deletions
diff --git a/kamon-spray/src/main/resources/META-INF/aop.xml b/kamon-spray/src/main/resources/META-INF/aop.xml
new file mode 100644
index 00000000..afbbb8c0
--- /dev/null
+++ b/kamon-spray/src/main/resources/META-INF/aop.xml
@@ -0,0 +1,10 @@
+<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
+
+<aspectj>
+ <weaver options="-verbose -showWeaveInfo"/>
+
+ <aspects>
+ <aspect name="spray.can.server.ServerRequestTracing"/>
+ <include within="spray..*"/>
+ </aspects>
+</aspectj>
diff --git a/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
index 743769e2..08cb53ff 100644
--- a/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
+++ b/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
@@ -1,61 +1,28 @@
-package kamon.instrumentation
+package spray.can.server
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
import spray.http.HttpRequest
import spray.http.HttpHeaders.Host
+import kamon.trace.{TraceContext, Trace, ContextAware, TimedContextAware}
//import spray.can.client.HttpHostConnector.RequestContext
-trait ContextAware {
- def traceContext: Option[TraceContext]
-}
-trait TimedContextAware {
- def timestamp: Long
- def traceContext: Option[TraceContext]
-}
@Aspect
class SprayOpenRequestContextTracing {
- @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest")
- def mixinContextAwareToOpenRequest: ContextAware = new ContextAware {
- val traceContext: Option[TraceContext] = Tracer.traceContext.value
- }
@DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
def mixinContextAwareToRequestContext: TimedContextAware = new TimedContextAware {
val timestamp: Long = System.nanoTime()
- val traceContext: Option[TraceContext] = Tracer.traceContext.value
+ val traceContext: Option[TraceContext] = Trace.context()
}
}
@Aspect
class SprayServerInstrumentation {
- @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)")
- def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {}
-
- @After("openRequestInit(openRequest, request)")
- def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = {
- Tracer.start
- openRequest.traceContext
-
- Tracer.context().map(_.tracer ! Rename(request.uri.path.toString()))
- }
-
- @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)")
- def openRequestCreation(openRequest: ContextAware): Unit = {}
-
- @After("openRequestCreation(openRequest)")
- def afterFinishingRequest(openRequest: ContextAware): Unit = {
- val original = openRequest.traceContext
- Tracer.context().map(_.tracer ! Finish())
-
- if(Tracer.context() != original) {
- println(s"OMG DIFFERENT Original: [${original}] - Came in: [${Tracer.context}]")
- }
- }
@Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)")
def requestRecordInit(ctx: TimedContextAware, request: HttpRequest): Unit = {}
@@ -63,10 +30,10 @@ class SprayServerInstrumentation {
@After("requestRecordInit(ctx, request)")
def whenCreatedRequestRecord(ctx: TimedContextAware, request: HttpRequest): Unit = {
// Necessary to force the initialization of TracingAwareRequestContext at the moment of creation.
- for{
+ /*for{
tctx <- ctx.traceContext
host <- request.header[Host]
- } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host)
+ } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host)*/
}
@@ -78,12 +45,12 @@ class SprayServerInstrumentation {
def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TimedContextAware, message: Any) = {
println("Completing the request with context: " + requestContext.traceContext)
- Tracer.traceContext.withValue(requestContext.traceContext) {
+ /*Tracer.context.withValue(requestContext.traceContext) {
requestContext.traceContext.map {
- tctx => tctx.tracer ! WebExternalFinish(requestContext.timestamp)
+ tctx => //tctx.tracer ! WebExternalFinish(requestContext.timestamp)
}
pjp.proceed()
- }
+ }*/
}
@@ -94,17 +61,17 @@ class SprayServerInstrumentation {
@Around("copyingRequestContext(old)")
def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TimedContextAware) = {
println("Instrumenting the request context copy.")
- Tracer.traceContext.withValue(old.traceContext) {
+ /*Tracer.traceContext.withValue(old.traceContext) {
pjp.proceed()
- }
+ }*/
}
}
-case class DefaultTracingAwareRequestContext(traceContext: Option[TraceContext] = Tracer.context(), timestamp: Long = System.nanoTime) extends TracingAwareContext
-
@Aspect
class SprayRequestContextTracing {
@DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
- def mixin: TracingAwareContext = DefaultTracingAwareRequestContext()
+ def mixin: ContextAware = new ContextAware {
+ val traceContext: Option[TraceContext] = Trace.context()
+ }
} \ No newline at end of file
diff --git a/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala
new file mode 100644
index 00000000..6f913a67
--- /dev/null
+++ b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala
@@ -0,0 +1,27 @@
+package kamon.spray
+
+import spray.routing.directives.BasicDirectives
+import spray.routing._
+import java.util.concurrent.atomic.AtomicLong
+import scala.util.Try
+import java.net.InetAddress
+import kamon.trace.Trace
+
+trait UowDirectives extends BasicDirectives {
+ def uow: Directive0 = mapRequest { request =>
+ val uowHeader = request.headers.find(_.name == "X-UOW")
+
+ val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow)
+ // TODO: Tracer will always have a context at this point, just rename the uow.
+ //Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(uow = generatedUow))
+
+ request
+ }
+}
+
+object UowDirectives {
+ val uowCounter = new AtomicLong
+ val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
+ def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet())
+
+} \ No newline at end of file
diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala
new file mode 100644
index 00000000..d5e21f35
--- /dev/null
+++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala
@@ -0,0 +1,56 @@
+package spray.can.server
+
+import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect}
+import kamon.trace.{Trace, TraceContext, ContextAware}
+import spray.http.HttpRequest
+import akka.actor.ActorSystem
+import akka.event.Logging.Warning
+
+
+@Aspect
+class ServerRequestTracing {
+
+ @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest")
+ def mixinContextAwareToOpenRequest: ContextAware = ContextAware.default
+
+
+ @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)")
+ def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {}
+
+ @After("openRequestInit(openRequest, request)")
+ def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = {
+ val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system
+ val defaultTraceName: String = request.method.value + ": " + request.uri.path
+
+ Trace.start(defaultTraceName)(system)
+
+ // Necessary to force initialization of traceContext when initiating the request.
+ openRequest.traceContext
+ }
+
+ @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)")
+ def openRequestCreation(openRequest: ContextAware): Unit = {}
+
+ @After("openRequestCreation(openRequest)")
+ def afterFinishingRequest(openRequest: ContextAware): Unit = {
+ val storedContext = openRequest.traceContext
+ val incomingContext = Trace.finish()
+
+ for(original <- storedContext) {
+ incomingContext match {
+ case Some(incoming) if original.id != incoming.id =>
+ publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]")
+
+ case Some(_) => // nothing to do here.
+
+ case None =>
+ publishWarning(s"Trace context not present while closing the Trace: [$original]")
+ }
+ }
+
+ def publishWarning(text: String): Unit = {
+ val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system
+ system.eventStream.publish(Warning("", classOf[ServerRequestTracing], text))
+ }
+ }
+}
diff --git a/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala b/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala
new file mode 100644
index 00000000..4cff38be
--- /dev/null
+++ b/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala
@@ -0,0 +1,49 @@
+package kamon
+
+import _root_.spray.httpx.RequestBuilding
+import _root_.spray.routing.SimpleRoutingApp
+import akka.testkit.TestKit
+import akka.actor.{ActorRef, ActorSystem}
+import org.scalatest.WordSpecLike
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import _root_.spray.client.pipelining._
+import akka.util.Timeout
+import kamon.trace.Trace
+import kamon.Kamon.Extension
+import kamon.trace.UowTracing.{Finish, Start}
+
+class ServerRequestTracingSpec extends TestKit(ActorSystem("server-request-tracing-spec")) with WordSpecLike with RequestBuilding {
+
+ "the spray server request tracing instrumentation" should {
+ "start tracing a request when entering the server and close it when responding" in new TestServer {
+ client(Get(s"http://127.0.0.1:$port/"))
+
+ within(5 seconds) {
+ val traceId = expectMsgPF() { case Start(id) => id}
+ expectMsgPF() { case Finish(traceId) => }
+ }
+ }
+ }
+
+
+
+ trait TestServer extends SimpleRoutingApp {
+
+ // Nasty, but very helpful for tests.
+ AkkaExtensionSwap.swap(system, Trace, new Extension {
+ def manager: ActorRef = testActor
+ })
+
+ implicit val timeout = Timeout(20 seconds)
+ val port: Int = Await.result(
+ startServer(interface = "127.0.0.1", port = 0)(
+ get {
+ complete("ok")
+ }
+ ), timeout.duration).localAddress.getPort
+
+ val client = sendReceive(system, system.dispatcher, timeout)
+
+ }
+}