From 604d5801332838f8bea25fe25cb8df5dbb82af08 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 25 Sep 2013 19:45:54 -0300 Subject: wip --- kamon-core/src/main/scala/kamon/TraceContext.scala | 2 +- .../ActorRefTellInstrumentation.scala | 4 ++- .../SprayServerInstrumentation.scala | 33 ++++++++++++++++------ .../scala/kamon/newrelic/NewRelicReporting.scala | 5 ---- .../src/main/scala/kamon/trace/UowTracing.scala | 12 ++++++-- 5 files changed, 39 insertions(+), 17 deletions(-) (limited to 'kamon-core/src/main/scala') diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala index a1476ae0..73186a18 100644 --- a/kamon-core/src/main/scala/kamon/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -19,7 +19,7 @@ case class TraceContext(id: Long, entries: ActorRef, userContext: Option[Any] = object TraceContext { val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting]) val traceIdCounter = new AtomicLong - def apply()(implicit system: ActorSystem) = new TraceContext(100, system.actorOf(UowTraceAggregator.props(reporter, 30 seconds))) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer + def apply()(implicit system: ActorSystem) = new TraceContext(100, system.actorOf(UowTraceAggregator.props(reporter, 30 seconds), "tracer-"+traceIdCounter.incrementAndGet())) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index fdd7b696..d92d7f6c 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -34,6 +34,7 @@ class ActorRefTellInstrumentation { @Aspect("""perthis(actorCellCreation(akka.actor.ActorSystem, akka.actor.ActorRef, akka.actor.Props, akka.dispatch.MessageDispatcher, akka.actor.ActorRef))""") class ActorCellInvokeInstrumentation { var instrumentation = ActorReceiveInvokeInstrumentation.noopPreReceive + var self: ActorRef = _ // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") @@ -42,6 +43,7 @@ class ActorCellInvokeInstrumentation { @After("actorCellCreation(system, ref, props, dispatcher, parent)") def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { instrumentation = kamon.Instrument.instrumentation.receiveInvokeInstrumentation(system, ref, props, dispatcher, parent) + self = ref } @@ -53,7 +55,7 @@ class ActorCellInvokeInstrumentation { import ProceedingJoinPointPimp._ val (originalEnvelope, ctx) = instrumentation.preReceive(envelope) - //println("Test") + //println(s"====>[$ctx] ## [${originalEnvelope.sender}] => [$self] --- ${originalEnvelope.message}") ctx match { case Some(c) => { //MDC.put("uow", c.userContext.get.asInstanceOf[String]) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala index f8ab709e..9422a9f7 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -4,26 +4,39 @@ import org.aspectj.lang.annotation.{DeclareMixin, After, Pointcut, Aspect} import kamon.{TraceContext, Tracer} import kamon.trace.UowTracing.{WebExternal, Finish, Rename} import spray.http.HttpRequest -import spray.can.server.OpenRequestComponent +import spray.can.server.{OpenRequest, OpenRequestComponent} import spray.can.client.HttpHostConnector.RequestContext import spray.http.HttpHeaders.Host +trait ContextAware { + def traceContext: Option[TraceContext] +} + +@Aspect +class SprayOpenRequestContextTracing { + @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") + def mixinContextAwareToOpenRequest: ContextAware = new ContextAware { + val traceContext: Option[TraceContext] = Tracer.context() + } +} + @Aspect class SprayServerInstrumentation { - @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && args(enclosing, request, closeAfterResponseCompletion, timestamp)") - def openRequestInit(enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {} + @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(enclosing, request, closeAfterResponseCompletion, timestamp)") + def openRequestInit(openRequest: OpenRequest, enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {} - @After("openRequestInit(enclosing, request, closeAfterResponseCompletion, timestamp)") - def afterInit(enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = { + @After("openRequestInit(openRequest, enclosing, request, closeAfterResponseCompletion, timestamp)") + def afterInit(openRequest: OpenRequest, enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = { //@After("openRequestInit()") //def afterInit(): Unit = { Tracer.start - //println("Created the context: " + Tracer.context() + " for the transaction: " + request.uri.path.toString()) + //openRequest.traceContext + println("Created the context: " + Tracer.context() + " for the transaction: " + request) Tracer.context().map(_.entries ! Rename(request.uri.path.toString())) } - @Pointcut("execution(* spray.can.server.OpenRequest.handleResponseEndAndReturnNextOpenRequest(..))") + @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..))") def openRequestCreation(): Unit = {} @After("openRequestCreation()") @@ -31,6 +44,10 @@ class SprayServerInstrumentation { println("Finishing a request: " + Tracer.context()) Tracer.context().map(_.entries ! Finish()) + + if(Tracer.context().isEmpty) { + println("WOOOOOPAAAAAAAAA") + } } @@ -55,7 +72,7 @@ class SprayServerInstrumentation { @After("requestRecordInit2(ctx, msg)") def whenCreatedRequestRecord2(ctx: TracingAwareRequestContext, msg: Any): Unit = { - println("=======> Spent in WEB External: " + (System.nanoTime() - ctx.timestamp)) + //println("=======> Spent in WEB External: " + (System.nanoTime() - ctx.timestamp)) // TODO: REMOVE THIS: val request = (ctx.asInstanceOf[RequestContext]).request diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala index 33f169da..31e50cfe 100644 --- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala +++ b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala @@ -24,11 +24,6 @@ class NewRelicReporting extends Actor { NewRelic.recordMetric(s"External/all", external) NewRelic.recordMetric(s"External/allWeb", external) - NewRelic.recordMetric(s"Solr/all", 0.1F) - NewRelic.recordMetric(s"Solr/allWeb", 0.1F) - NewRelic.recordMetric(s"Solr/set", 0.1F) - NewRelic.recordMetric(s"Solr/set/WebTransaction/Custom/test", 0.1F) - NewRelic.recordMetric(s"External/${webExternalTrace.host}/http", external) NewRelic.recordMetric(s"External/${webExternalTrace.host}/all", external) NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/" + "WebTransaction/Custom" + uowTrace.name, external) diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala index 8efc46b2..48def942 100644 --- a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala +++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala @@ -1,8 +1,13 @@ package kamon.trace -import akka.actor.{Props, ActorRef, Actor} +import akka.actor._ import kamon.trace.UowTracing.{Start, Finish, Rename} import scala.concurrent.duration.Duration +import kamon.trace.UowTracing.Finish +import kamon.trace.UowTracing.Rename +import kamon.trace.UowTrace +import kamon.trace.UowTracing.Start +import scala.Some sealed trait UowSegment { def timestamp: Long @@ -22,7 +27,7 @@ object UowTracing { case class UowTrace(name: String, segments: Seq[UowSegment]) -class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor { +class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging { context.setReceiveTimeout(aggregationTimeout) self ! Start() @@ -33,6 +38,9 @@ class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) exte case finish: Finish => segments = segments :+ finish; finishTracing() case Rename(newName) => name = Some(newName) case segment: UowSegment => segments = segments :+ segment + case ReceiveTimeout => + log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments) + context.stop(self) } def finishTracing(): Unit = { -- cgit v1.2.3