From bbf7afd85809f6d43b310290b4bb9102dd36043c Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 18 Sep 2013 18:43:11 -0300 Subject: basic newrelic reporting --- kamon-core/src/main/scala/kamon/TraceContext.scala | 20 +++++----- .../src/main/scala/kamon/TraceContextSwap.scala | 4 ++ .../ActorRefTellInstrumentation.scala | 10 +++-- .../SprayServerInstrumentation.scala | 33 ++++++++++++++++ .../scala/kamon/newrelic/NewRelicReporting.scala | 21 ++++++++++ .../src/main/scala/kamon/trace/UowTracing.scala | 45 ++++++++++++++++++++++ 6 files changed, 120 insertions(+), 13 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/UowTracing.scala (limited to 'kamon-core/src/main/scala/kamon') diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala index 62d7f57e..a1476ae0 100644 --- a/kamon-core/src/main/scala/kamon/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -2,14 +2,14 @@ package kamon import java.util.UUID import akka.actor._ -import akka.agent.Agent -import java.util.concurrent.TimeUnit -import scala.util.{Failure, Success} -import akka.util.Timeout - - -case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) { - implicit val timeout = Timeout(30, TimeUnit.SECONDS) +import java.util.concurrent.atomic.AtomicLong +import kamon.trace.UowTraceAggregator +import scala.concurrent.duration._ +import kamon.newrelic.NewRelicReporting + +// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. +case class TraceContext(id: Long, entries: ActorRef, userContext: Option[Any] = None) { + //implicit val timeout = Timeout(30, TimeUnit.SECONDS) implicit val as = Kamon.actorSystem.dispatcher def append(entry: TraceEntry) = entries ! entry @@ -17,7 +17,9 @@ case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = } object TraceContext { - def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer + val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting]) + val traceIdCounter = new AtomicLong + def apply()(implicit system: ActorSystem) = new TraceContext(100, system.actorOf(UowTraceAggregator.props(reporter, 30 seconds))) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer } diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala index 24661445..4b5b66a9 100644 --- a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala +++ b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala @@ -1,5 +1,7 @@ package kamon +import org.slf4j.MDC + /** * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards. */ @@ -10,9 +12,11 @@ trait TraceContextSwap { def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = { ctx match { case Some(context) => { + MDC.put("uow", context.userContext.get.asInstanceOf[String]) Tracer.set(context) val bodyResult = primary Tracer.clear + MDC.remove("uow") bodyResult } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 84498cb8..4f0b8a08 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -2,13 +2,14 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{Props, ActorSystem, ActorRef} +import akka.actor.{ActorCell, Props, ActorSystem, ActorRef} import kamon.{Kamon, Tracer, TraceContext} import akka.dispatch.{MessageDispatcher, Envelope} import com.codahale.metrics.Timer import kamon.metric.{MetricDirectory, Metrics} import scala.Some import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage +import org.slf4j.MDC case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) @@ -52,11 +53,14 @@ class ActorCellInvokeInstrumentation { import ProceedingJoinPointPimp._ val (originalEnvelope, ctx) = instrumentation.preReceive(envelope) + //println("Test") ctx match { case Some(c) => { + //MDC.put("uow", c.userContext.get.asInstanceOf[String]) Tracer.set(c) pjp.proceedWith(originalEnvelope) Tracer.clear + //MDC.remove("uow") } case None => pjp.proceedWith(originalEnvelope) } @@ -74,9 +78,7 @@ class UnregisteredActorRefInstrumentation { import ProceedingJoinPointPimp._ println("Handling unregistered actor ref message: "+message) message match { - case TraceableMessage(ctx, msg, timer) => { - timer.stop() - + case SimpleTraceMessage(msg, ctx) => { ctx match { case Some(c) => { Tracer.set(c) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala new file mode 100644 index 00000000..6573549d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -0,0 +1,33 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation.{After, Pointcut, Aspect} +import kamon.Tracer +import kamon.trace.UowTracing.{Finish, Rename} +import spray.http.HttpRequest +import spray.can.server.OpenRequestComponent + +@Aspect +class SprayServerInstrumentation { + + @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && args(enclosing, request, closeAfterResponseCompletion, timestamp)") + def openRequestInit(enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {} + + @After("openRequestInit(enclosing, request, closeAfterResponseCompletion, timestamp)") + def afterInit(enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = { + //@After("openRequestInit()") + //def afterInit(): Unit = { + Tracer.start + println("Created the context: " + Tracer.context() + " for the transaction: " + request.uri.path.toString()) + Tracer.context().map(_.entries ! Rename(request.uri.path.toString())) + } + + @Pointcut("execution(* spray.can.server.OpenRequest.handleResponseEndAndReturnNextOpenRequest(..))") + def openRequestCreation(): Unit = {} + + @After("openRequestCreation()") + def afterFinishingRequest(): Unit = { + println("Finishing a request: " + Tracer.context()) + + Tracer.context().map(_.entries ! Finish()) + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala new file mode 100644 index 00000000..131ecba9 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala @@ -0,0 +1,21 @@ +package kamon.newrelic + +import akka.actor.Actor +import kamon.trace.UowTrace +import com.newrelic.api.agent.{Trace, NewRelic} + + +class NewRelicReporting extends Actor { + def receive = { + case trace: UowTrace => recordTransaction(trace) + } + + //@Trace + def recordTransaction(uowTrace: UowTrace): Unit = { + val time = (uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9 + + NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat) + NewRelic.recordMetric("WebTransaction", time.toFloat) + NewRelic.recordMetric("HttpDispatcher", time.toFloat) + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala new file mode 100644 index 00000000..b38d3d95 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala @@ -0,0 +1,45 @@ +package kamon.trace + +import akka.actor.{Props, ActorRef, Actor} +import kamon.trace.UowTracing.{Start, Finish, Rename} +import scala.concurrent.duration.Duration + +sealed trait UowSegment { + def timestamp: Long +} + +trait AutoTimestamp extends UowSegment { + val timestamp = System.nanoTime +} + +object UowTracing { + case class Start() extends AutoTimestamp + case class Finish() extends AutoTimestamp + case class Rename(name: String) extends AutoTimestamp +} + +case class UowTrace(name: String, segments: Seq[UowSegment]) + + +class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor { + context.setReceiveTimeout(aggregationTimeout) + self ! Start() + + var name: Option[String] = None + var segments: Seq[UowSegment] = Nil + + def receive = { + case finish: Finish => segments = segments :+ finish; finishTracing() + case Rename(newName) => name = Some(newName) + case segment: UowSegment => segments = segments :+ segment + } + + def finishTracing(): Unit = { + reporting ! UowTrace(name.getOrElse("UNKNOWN"), segments) + context.stop(self) + } +} + +object UowTraceAggregator { + def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout) +} \ No newline at end of file -- cgit v1.2.3