From 2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Tue, 5 Nov 2013 18:38:39 -0300 Subject: basic separation of concerns between sub-projects --- .../SprayServerInstrumentation.scala | 59 +++++----------------- .../src/main/scala/kamon/spray/UowDirectives.scala | 27 ++++++++++ .../spray/can/server/ServerRequestTracing.scala | 56 ++++++++++++++++++++ 3 files changed, 96 insertions(+), 46 deletions(-) create mode 100644 kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala create mode 100644 kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala (limited to 'kamon-spray/src/main/scala') diff --git a/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala index 743769e2..08cb53ff 100644 --- a/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala +++ b/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -1,61 +1,28 @@ -package kamon.instrumentation +package spray.can.server import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import spray.http.HttpRequest import spray.http.HttpHeaders.Host +import kamon.trace.{TraceContext, Trace, ContextAware, TimedContextAware} //import spray.can.client.HttpHostConnector.RequestContext -trait ContextAware { - def traceContext: Option[TraceContext] -} -trait TimedContextAware { - def timestamp: Long - def traceContext: Option[TraceContext] -} @Aspect class SprayOpenRequestContextTracing { - @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") - def mixinContextAwareToOpenRequest: ContextAware = new ContextAware { - val traceContext: Option[TraceContext] = Tracer.traceContext.value - } @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") def mixinContextAwareToRequestContext: TimedContextAware = new TimedContextAware { val timestamp: Long = System.nanoTime() - val traceContext: Option[TraceContext] = Tracer.traceContext.value + val traceContext: Option[TraceContext] = Trace.context() } } @Aspect class SprayServerInstrumentation { - @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)") - def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {} - - @After("openRequestInit(openRequest, request)") - def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = { - Tracer.start - openRequest.traceContext - - Tracer.context().map(_.tracer ! Rename(request.uri.path.toString())) - } - - @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)") - def openRequestCreation(openRequest: ContextAware): Unit = {} - - @After("openRequestCreation(openRequest)") - def afterFinishingRequest(openRequest: ContextAware): Unit = { - val original = openRequest.traceContext - Tracer.context().map(_.tracer ! Finish()) - - if(Tracer.context() != original) { - println(s"OMG DIFFERENT Original: [${original}] - Came in: [${Tracer.context}]") - } - } @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)") def requestRecordInit(ctx: TimedContextAware, request: HttpRequest): Unit = {} @@ -63,10 +30,10 @@ class SprayServerInstrumentation { @After("requestRecordInit(ctx, request)") def whenCreatedRequestRecord(ctx: TimedContextAware, request: HttpRequest): Unit = { // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. - for{ + /*for{ tctx <- ctx.traceContext host <- request.header[Host] - } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host) + } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host)*/ } @@ -78,12 +45,12 @@ class SprayServerInstrumentation { def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TimedContextAware, message: Any) = { println("Completing the request with context: " + requestContext.traceContext) - Tracer.traceContext.withValue(requestContext.traceContext) { + /*Tracer.context.withValue(requestContext.traceContext) { requestContext.traceContext.map { - tctx => tctx.tracer ! WebExternalFinish(requestContext.timestamp) + tctx => //tctx.tracer ! WebExternalFinish(requestContext.timestamp) } pjp.proceed() - } + }*/ } @@ -94,17 +61,17 @@ class SprayServerInstrumentation { @Around("copyingRequestContext(old)") def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TimedContextAware) = { println("Instrumenting the request context copy.") - Tracer.traceContext.withValue(old.traceContext) { + /*Tracer.traceContext.withValue(old.traceContext) { pjp.proceed() - } + }*/ } } -case class DefaultTracingAwareRequestContext(traceContext: Option[TraceContext] = Tracer.context(), timestamp: Long = System.nanoTime) extends TracingAwareContext - @Aspect class SprayRequestContextTracing { @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") - def mixin: TracingAwareContext = DefaultTracingAwareRequestContext() + def mixin: ContextAware = new ContextAware { + val traceContext: Option[TraceContext] = Trace.context() + } } \ No newline at end of file diff --git a/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala new file mode 100644 index 00000000..6f913a67 --- /dev/null +++ b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala @@ -0,0 +1,27 @@ +package kamon.spray + +import spray.routing.directives.BasicDirectives +import spray.routing._ +import java.util.concurrent.atomic.AtomicLong +import scala.util.Try +import java.net.InetAddress +import kamon.trace.Trace + +trait UowDirectives extends BasicDirectives { + def uow: Directive0 = mapRequest { request => + val uowHeader = request.headers.find(_.name == "X-UOW") + + val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow) + // TODO: Tracer will always have a context at this point, just rename the uow. + //Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(uow = generatedUow)) + + request + } +} + +object UowDirectives { + val uowCounter = new AtomicLong + val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") + def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet()) + +} \ No newline at end of file diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala new file mode 100644 index 00000000..d5e21f35 --- /dev/null +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala @@ -0,0 +1,56 @@ +package spray.can.server + +import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect} +import kamon.trace.{Trace, TraceContext, ContextAware} +import spray.http.HttpRequest +import akka.actor.ActorSystem +import akka.event.Logging.Warning + + +@Aspect +class ServerRequestTracing { + + @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") + def mixinContextAwareToOpenRequest: ContextAware = ContextAware.default + + + @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)") + def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {} + + @After("openRequestInit(openRequest, request)") + def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = { + val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system + val defaultTraceName: String = request.method.value + ": " + request.uri.path + + Trace.start(defaultTraceName)(system) + + // Necessary to force initialization of traceContext when initiating the request. + openRequest.traceContext + } + + @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)") + def openRequestCreation(openRequest: ContextAware): Unit = {} + + @After("openRequestCreation(openRequest)") + def afterFinishingRequest(openRequest: ContextAware): Unit = { + val storedContext = openRequest.traceContext + val incomingContext = Trace.finish() + + for(original <- storedContext) { + incomingContext match { + case Some(incoming) if original.id != incoming.id => + publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]") + + case Some(_) => // nothing to do here. + + case None => + publishWarning(s"Trace context not present while closing the Trace: [$original]") + } + } + + def publishWarning(text: String): Unit = { + val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system + system.eventStream.publish(Warning("", classOf[ServerRequestTracing], text)) + } + } +} -- cgit v1.2.3