From 2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Tue, 5 Nov 2013 18:38:39 -0300 Subject: basic separation of concerns between sub-projects --- kamon-spray/src/main/resources/META-INF/aop.xml | 10 ++++ .../SprayServerInstrumentation.scala | 59 +++++----------------- .../src/main/scala/kamon/spray/UowDirectives.scala | 27 ++++++++++ .../spray/can/server/ServerRequestTracing.scala | 56 ++++++++++++++++++++ .../scala/kamon/ServerRequestTracingSpec.scala | 49 ++++++++++++++++++ 5 files changed, 155 insertions(+), 46 deletions(-) create mode 100644 kamon-spray/src/main/resources/META-INF/aop.xml create mode 100644 kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala create mode 100644 kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala create mode 100644 kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala (limited to 'kamon-spray/src') diff --git a/kamon-spray/src/main/resources/META-INF/aop.xml b/kamon-spray/src/main/resources/META-INF/aop.xml new file mode 100644 index 00000000..afbbb8c0 --- /dev/null +++ b/kamon-spray/src/main/resources/META-INF/aop.xml @@ -0,0 +1,10 @@ + + + + + + + + + + diff --git a/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala index 743769e2..08cb53ff 100644 --- a/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala +++ b/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -1,61 +1,28 @@ -package kamon.instrumentation +package spray.can.server import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import spray.http.HttpRequest import spray.http.HttpHeaders.Host +import kamon.trace.{TraceContext, Trace, ContextAware, TimedContextAware} //import spray.can.client.HttpHostConnector.RequestContext -trait ContextAware { - def traceContext: Option[TraceContext] -} -trait TimedContextAware { - def timestamp: Long - def traceContext: Option[TraceContext] -} @Aspect class SprayOpenRequestContextTracing { - @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") - def mixinContextAwareToOpenRequest: ContextAware = new ContextAware { - val traceContext: Option[TraceContext] = Tracer.traceContext.value - } @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") def mixinContextAwareToRequestContext: TimedContextAware = new TimedContextAware { val timestamp: Long = System.nanoTime() - val traceContext: Option[TraceContext] = Tracer.traceContext.value + val traceContext: Option[TraceContext] = Trace.context() } } @Aspect class SprayServerInstrumentation { - @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)") - def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {} - - @After("openRequestInit(openRequest, request)") - def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = { - Tracer.start - openRequest.traceContext - - Tracer.context().map(_.tracer ! Rename(request.uri.path.toString())) - } - - @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)") - def openRequestCreation(openRequest: ContextAware): Unit = {} - - @After("openRequestCreation(openRequest)") - def afterFinishingRequest(openRequest: ContextAware): Unit = { - val original = openRequest.traceContext - Tracer.context().map(_.tracer ! Finish()) - - if(Tracer.context() != original) { - println(s"OMG DIFFERENT Original: [${original}] - Came in: [${Tracer.context}]") - } - } @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)") def requestRecordInit(ctx: TimedContextAware, request: HttpRequest): Unit = {} @@ -63,10 +30,10 @@ class SprayServerInstrumentation { @After("requestRecordInit(ctx, request)") def whenCreatedRequestRecord(ctx: TimedContextAware, request: HttpRequest): Unit = { // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. - for{ + /*for{ tctx <- ctx.traceContext host <- request.header[Host] - } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host) + } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host)*/ } @@ -78,12 +45,12 @@ class SprayServerInstrumentation { def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TimedContextAware, message: Any) = { println("Completing the request with context: " + requestContext.traceContext) - Tracer.traceContext.withValue(requestContext.traceContext) { + /*Tracer.context.withValue(requestContext.traceContext) { requestContext.traceContext.map { - tctx => tctx.tracer ! WebExternalFinish(requestContext.timestamp) + tctx => //tctx.tracer ! WebExternalFinish(requestContext.timestamp) } pjp.proceed() - } + }*/ } @@ -94,17 +61,17 @@ class SprayServerInstrumentation { @Around("copyingRequestContext(old)") def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TimedContextAware) = { println("Instrumenting the request context copy.") - Tracer.traceContext.withValue(old.traceContext) { + /*Tracer.traceContext.withValue(old.traceContext) { pjp.proceed() - } + }*/ } } -case class DefaultTracingAwareRequestContext(traceContext: Option[TraceContext] = Tracer.context(), timestamp: Long = System.nanoTime) extends TracingAwareContext - @Aspect class SprayRequestContextTracing { @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") - def mixin: TracingAwareContext = DefaultTracingAwareRequestContext() + def mixin: ContextAware = new ContextAware { + val traceContext: Option[TraceContext] = Trace.context() + } } \ No newline at end of file diff --git a/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala new file mode 100644 index 00000000..6f913a67 --- /dev/null +++ b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala @@ -0,0 +1,27 @@ +package kamon.spray + +import spray.routing.directives.BasicDirectives +import spray.routing._ +import java.util.concurrent.atomic.AtomicLong +import scala.util.Try +import java.net.InetAddress +import kamon.trace.Trace + +trait UowDirectives extends BasicDirectives { + def uow: Directive0 = mapRequest { request => + val uowHeader = request.headers.find(_.name == "X-UOW") + + val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow) + // TODO: Tracer will always have a context at this point, just rename the uow. + //Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(uow = generatedUow)) + + request + } +} + +object UowDirectives { + val uowCounter = new AtomicLong + val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") + def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet()) + +} \ No newline at end of file diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala new file mode 100644 index 00000000..d5e21f35 --- /dev/null +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala @@ -0,0 +1,56 @@ +package spray.can.server + +import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect} +import kamon.trace.{Trace, TraceContext, ContextAware} +import spray.http.HttpRequest +import akka.actor.ActorSystem +import akka.event.Logging.Warning + + +@Aspect +class ServerRequestTracing { + + @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") + def mixinContextAwareToOpenRequest: ContextAware = ContextAware.default + + + @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)") + def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {} + + @After("openRequestInit(openRequest, request)") + def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = { + val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system + val defaultTraceName: String = request.method.value + ": " + request.uri.path + + Trace.start(defaultTraceName)(system) + + // Necessary to force initialization of traceContext when initiating the request. + openRequest.traceContext + } + + @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)") + def openRequestCreation(openRequest: ContextAware): Unit = {} + + @After("openRequestCreation(openRequest)") + def afterFinishingRequest(openRequest: ContextAware): Unit = { + val storedContext = openRequest.traceContext + val incomingContext = Trace.finish() + + for(original <- storedContext) { + incomingContext match { + case Some(incoming) if original.id != incoming.id => + publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]") + + case Some(_) => // nothing to do here. + + case None => + publishWarning(s"Trace context not present while closing the Trace: [$original]") + } + } + + def publishWarning(text: String): Unit = { + val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system + system.eventStream.publish(Warning("", classOf[ServerRequestTracing], text)) + } + } +} diff --git a/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala b/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala new file mode 100644 index 00000000..4cff38be --- /dev/null +++ b/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala @@ -0,0 +1,49 @@ +package kamon + +import _root_.spray.httpx.RequestBuilding +import _root_.spray.routing.SimpleRoutingApp +import akka.testkit.TestKit +import akka.actor.{ActorRef, ActorSystem} +import org.scalatest.WordSpecLike +import scala.concurrent.Await +import scala.concurrent.duration._ +import _root_.spray.client.pipelining._ +import akka.util.Timeout +import kamon.trace.Trace +import kamon.Kamon.Extension +import kamon.trace.UowTracing.{Finish, Start} + +class ServerRequestTracingSpec extends TestKit(ActorSystem("server-request-tracing-spec")) with WordSpecLike with RequestBuilding { + + "the spray server request tracing instrumentation" should { + "start tracing a request when entering the server and close it when responding" in new TestServer { + client(Get(s"http://127.0.0.1:$port/")) + + within(5 seconds) { + val traceId = expectMsgPF() { case Start(id) => id} + expectMsgPF() { case Finish(traceId) => } + } + } + } + + + + trait TestServer extends SimpleRoutingApp { + + // Nasty, but very helpful for tests. + AkkaExtensionSwap.swap(system, Trace, new Extension { + def manager: ActorRef = testActor + }) + + implicit val timeout = Timeout(20 seconds) + val port: Int = Await.result( + startServer(interface = "127.0.0.1", port = 0)( + get { + complete("ok") + } + ), timeout.duration).localAddress.getPort + + val client = sendReceive(system, system.dispatcher, timeout) + + } +} -- cgit v1.2.3