From 5ffb3a50be348237fda9a8176b508284c59261af Mon Sep 17 00:00:00 2001 From: Diego Parra Date: Thu, 19 Sep 2013 08:58:34 -0300 Subject: Envelope Instrumentation and some cleanup --- kamon-core/src/main/resources/META-INF/aop.xml | 3 +- kamon-core/src/main/resources/application.conf | 5 +- kamon-core/src/main/scala/kamon/TraceContext.scala | 21 +++-- .../ActorRefTellInstrumentation.scala | 91 +++++----------------- .../SprayServerInstrumentation.scala | 46 ++++------- .../scala/kamon/newrelic/NewRelicErrorLogger.scala | 19 +++++ .../src/main/scala/kamon/trace/UowTracing.scala | 4 - .../kamon/trace/context/TracingAwareContext.scala | 8 ++ 8 files changed, 76 insertions(+), 121 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala (limited to 'kamon-core') diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 79692dd3..efdce792 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -7,9 +7,8 @@ - + - diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf index c87c0ced..1b564f7e 100644 --- a/kamon-core/src/main/resources/application.conf +++ b/kamon-core/src/main/resources/application.conf @@ -1,9 +1,10 @@ akka { loglevel = INFO stdout-loglevel = INFO - log-dead-letters = off + log-dead-letters = on - #extensions = ["kamon.dashboard.DashboardExtension"] + #extensions = ["kamon.dashboard.DashboardExtension"] + akka.loggers = ["kamon.newrelic.NewRelicErrorLogger"] actor { default-dispatcher { diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala index 73186a18..23da7001 100644 --- a/kamon-core/src/main/scala/kamon/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -6,6 +6,7 @@ import java.util.concurrent.atomic.AtomicLong import kamon.trace.UowTraceAggregator import scala.concurrent.duration._ import kamon.newrelic.NewRelicReporting +import kamon.trace.UowTracing.Start // TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. case class TraceContext(id: Long, entries: ActorRef, userContext: Option[Any] = None) { @@ -19,7 +20,13 @@ case class TraceContext(id: Long, entries: ActorRef, userContext: Option[Any] = object TraceContext { val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting]) val traceIdCounter = new AtomicLong - def apply()(implicit system: ActorSystem) = new TraceContext(100, system.actorOf(UowTraceAggregator.props(reporter, 30 seconds), "tracer-"+traceIdCounter.incrementAndGet())) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer + + def apply()(implicit system: ActorSystem) = { + val actor = system.actorOf(UowTraceAggregator.props(reporter, 30 seconds), s"tracer-${traceIdCounter.incrementAndGet()}") + actor ! Start() + + new TraceContext(100, actor) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer + } } @@ -32,20 +39,10 @@ class TraceAccumulator extends Actor { trait TraceEntry - case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry - - - case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry]) - - - - -object Collector { - -} +object Collector trait TraceEntryStorage { def store(entry: TraceEntry): Boolean diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index df124f41..43841165 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -2,105 +2,54 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{ActorCell, Props, ActorSystem, ActorRef} -import kamon.{Kamon, Tracer, TraceContext} -import akka.dispatch.{MessageDispatcher, Envelope} +import akka.actor.{Props, ActorSystem, ActorRef} +import kamon.{Tracer, TraceContext} +import akka.dispatch.{Envelope, MessageDispatcher} import com.codahale.metrics.Timer -import kamon.metric.{MetricDirectory, Metrics} import scala.Some -import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage -import org.slf4j.MDC +import kamon.trace.context.TracingAwareContext case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) +case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Tracer.context(), timestamp: Long = System.nanoTime) extends TracingAwareContext - -@Aspect -class ActorRefTellInstrumentation { - import ProceedingJoinPointPimp._ - - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)") - def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} - - @Around("sendingMessageToActorRef(actor, message, sender)") - def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { - import kamon.Instrument.instrumentation.sendMessageTransformation - //println(s"====> [$sender] => [$actor] --- $message") - pjp.proceedWithTarget(actor, sendMessageTransformation(sender, actor, message).asInstanceOf[AnyRef], sender) - } -} - - - -@Aspect("""perthis(actorCellCreation(akka.actor.ActorSystem, akka.actor.ActorRef, akka.actor.Props, akka.dispatch.MessageDispatcher, akka.actor.ActorRef))""") +@Aspect("perthis(actorCellCreation(akka.actor.ActorSystem, akka.actor.ActorRef, akka.actor.Props, akka.dispatch.MessageDispatcher, akka.actor.ActorRef))") class ActorCellInvokeInstrumentation { - var instrumentation = ActorReceiveInvokeInstrumentation.noopPreReceive - var self: ActorRef = _ - // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - @After("actorCellCreation(system, ref, props, dispatcher, parent)") - def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - instrumentation = kamon.Instrument.instrumentation.receiveInvokeInstrumentation(system, ref, props, dispatcher, parent) - self = ref - } - - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { - import ProceedingJoinPointPimp._ - - val (originalEnvelope, ctx) = instrumentation.preReceive(envelope) - //println(s"====>[$ctx] ## [${originalEnvelope.sender}] => [$self] --- ${originalEnvelope.message}") - ctx match { + //safe cast + envelope.asInstanceOf[TracingAwareContext].traceContext match { case Some(c) => { - //MDC.put("uow", c.userContext.get.asInstanceOf[String]) Tracer.set(c) - pjp.proceedWith(originalEnvelope) + pjp.proceed() Tracer.clear - //MDC.remove("uow") } case None => assert(Tracer.context() == None) - pjp.proceedWith(originalEnvelope) + pjp.proceed() } Tracer.clear } } - @Aspect -class UnregisteredActorRefInstrumentation { - @Pointcut("execution(* akka.spray.UnregisteredActorRefBase+.handle(..)) && args(message, sender)") - def sprayResponderHandle(message: Any, sender: ActorRef) = {} +class EnvelopeTracingContext { - @Around("sprayResponderHandle(message, sender)") - def sprayInvokeAround(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { - import ProceedingJoinPointPimp._ - //println("Handling unregistered actor ref message: "+message) + @DeclareMixin("akka.dispatch.Envelope") + def mixin: TracingAwareContext = DefaultTracingAwareEnvelopeContext() - message match { - case SimpleTraceMessage(msg, ctx) => { - ctx match { - case Some(c) => { - Tracer.set(c) - pjp.proceedWith(msg.asInstanceOf[AnyRef]) // TODO: define if we should use Any or AnyRef and unify with the rest of the instrumentation. - Tracer.clear - } - case None => - assert(Tracer.context() == None) - pjp.proceedWith(msg.asInstanceOf[AnyRef]) - } - } - case _ => - //assert(Tracer.context() == None) - pjp.proceed - } + @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") + def requestRecordInit(ctx: TracingAwareContext): Unit = {} + + @After("requestRecordInit(ctx)") + def whenCreatedRequestRecord(ctx: TracingAwareContext): Unit = { + // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. + ctx.traceContext } } - - diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala index 4eafcebe..0299c4c5 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -2,11 +2,12 @@ package kamon.instrumentation import org.aspectj.lang.annotation.{DeclareMixin, After, Pointcut, Aspect} import kamon.{TraceContext, Tracer} -import kamon.trace.UowTracing.{WebExternal, Finish, Rename} +import kamon.trace.UowTracing.{Finish, Rename} import spray.http.HttpRequest import spray.can.server.{OpenRequest, OpenRequestComponent} -import spray.can.client.HttpHostConnector.RequestContext -import spray.http.HttpHeaders.Host +import kamon.trace.context.TracingAwareContext + +//import spray.can.client.HttpHostConnector.RequestContext trait ContextAware { def traceContext: Option[TraceContext] @@ -41,59 +42,44 @@ class SprayServerInstrumentation { @After("openRequestCreation()") def afterFinishingRequest(): Unit = { - //println("Finishing a request: " + Tracer.context()) +// println("Finishing a request: " + Tracer.context()) Tracer.context().map(_.entries ! Finish()) -/* + if(Tracer.context().isEmpty) { println("WOOOOOPAAAAAAAAA") - }*/ + } } - - - @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx)") - def requestRecordInit(ctx: TracingAwareRequestContext): Unit = {} + def requestRecordInit(ctx: TracingAwareContext): Unit = {} @After("requestRecordInit(ctx)") - def whenCreatedRequestRecord(ctx: TracingAwareRequestContext): Unit = { + def whenCreatedRequestRecord(ctx: TracingAwareContext): Unit = { // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. - ctx.context + ctx.traceContext } - - - - - @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(ctx, msg)") - def requestRecordInit2(ctx: TracingAwareRequestContext, msg: Any): Unit = {} + def requestRecordInit2(ctx: TracingAwareContext, msg: Any): Unit = {} @After("requestRecordInit2(ctx, msg)") - def whenCreatedRequestRecord2(ctx: TracingAwareRequestContext, msg: Any): Unit = { + def whenCreatedRequestRecord2(ctx: TracingAwareContext, msg: Any): Unit = { //println("=======> Spent in WEB External: " + (System.nanoTime() - ctx.timestamp)) // TODO: REMOVE THIS: - val request = (ctx.asInstanceOf[RequestContext]).request +// val request = (ctx.asInstanceOf[RequestContext]).request - ctx.context.map(_.entries ! WebExternal(ctx.timestamp, System.nanoTime(), request.header[Host].map(_.host).getOrElse("UNKNOWN"))) +// ctx.context.map(_.entries ! WebExternal(ctx.timestamp, System.nanoTime(), request.header[Host].map(_.host).getOrElse("UNKNOWN"))) } } -trait TracingAwareRequestContext { - def context: Option[TraceContext] - def timestamp: Long -} - -case class DefaultTracingAwareRequestContext(context: Option[TraceContext] = Tracer.context(), - timestamp: Long = System.nanoTime) extends TracingAwareRequestContext - +case class DefaultTracingAwareRequestContext(traceContext: Option[TraceContext] = Tracer.context(), timestamp: Long = System.nanoTime) extends TracingAwareContext @Aspect class SprayRequestContextTracing { @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") - def mixin: TracingAwareRequestContext = DefaultTracingAwareRequestContext() + def mixin: TracingAwareContext = DefaultTracingAwareRequestContext() } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala new file mode 100644 index 00000000..72bcb4e2 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -0,0 +1,19 @@ +package kamon.newrelic + +import akka.actor.Actor +import akka.event.Logging.Error +import akka.event.Logging.{LoggerInitialized, InitializeLogger} +import com.newrelic.api.agent.NewRelic +import NewRelic.noticeError + +class NewRelicErrorLogger extends Actor { + def receive = { + case InitializeLogger(_) => sender ! LoggerInitialized + case error @ Error(cause, logSource, logClass, message) => notifyError(error) + } + + def notifyError(error: Error): Unit = { + println(error.message) + noticeError(error.cause) + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala index 48def942..c794656d 100644 --- a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala +++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala @@ -1,12 +1,9 @@ package kamon.trace import akka.actor._ -import kamon.trace.UowTracing.{Start, Finish, Rename} import scala.concurrent.duration.Duration import kamon.trace.UowTracing.Finish import kamon.trace.UowTracing.Rename -import kamon.trace.UowTrace -import kamon.trace.UowTracing.Start import scala.Some sealed trait UowSegment { @@ -29,7 +26,6 @@ case class UowTrace(name: String, segments: Seq[UowSegment]) class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging { context.setReceiveTimeout(aggregationTimeout) - self ! Start() var name: Option[String] = None var segments: Seq[UowSegment] = Nil diff --git a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala new file mode 100644 index 00000000..3766dd22 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala @@ -0,0 +1,8 @@ +package kamon.trace.context + +import kamon.TraceContext + +trait TracingAwareContext { + def traceContext: Option[TraceContext] + def timestamp: Long +} \ No newline at end of file -- cgit v1.2.3