From bbf7afd85809f6d43b310290b4bb9102dd36043c Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 18 Sep 2013 18:43:11 -0300 Subject: basic newrelic reporting --- kamon-core/src/main/resources/META-INF/aop.xml | 6 +-- kamon-core/src/main/scala/kamon/TraceContext.scala | 20 +++++---- .../src/main/scala/kamon/TraceContextSwap.scala | 4 ++ .../ActorRefTellInstrumentation.scala | 10 +++-- .../SprayServerInstrumentation.scala | 33 ++++++++++++++ .../scala/kamon/newrelic/NewRelicReporting.scala | 21 +++++++++ .../src/main/scala/kamon/trace/UowTracing.scala | 45 +++++++++++++++++++ kamon-core/src/main/scala/test/PingPong.scala | 50 ++++++++++++++++------ .../scala/kamon/trace/TraceAggregatorSpec.scala | 36 ++++++++++++++++ .../main/scala/kamon/logging/UowDirectives.scala | 2 +- project/Build.scala | 2 +- 11 files changed, 199 insertions(+), 30 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/UowTracing.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 0f427611..10110300 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -11,16 +11,16 @@ - + - + - + diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala index 62d7f57e..a1476ae0 100644 --- a/kamon-core/src/main/scala/kamon/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -2,14 +2,14 @@ package kamon import java.util.UUID import akka.actor._ -import akka.agent.Agent -import java.util.concurrent.TimeUnit -import scala.util.{Failure, Success} -import akka.util.Timeout - - -case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) { - implicit val timeout = Timeout(30, TimeUnit.SECONDS) +import java.util.concurrent.atomic.AtomicLong +import kamon.trace.UowTraceAggregator +import scala.concurrent.duration._ +import kamon.newrelic.NewRelicReporting + +// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. +case class TraceContext(id: Long, entries: ActorRef, userContext: Option[Any] = None) { + //implicit val timeout = Timeout(30, TimeUnit.SECONDS) implicit val as = Kamon.actorSystem.dispatcher def append(entry: TraceEntry) = entries ! entry @@ -17,7 +17,9 @@ case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = } object TraceContext { - def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer + val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting]) + val traceIdCounter = new AtomicLong + def apply()(implicit system: ActorSystem) = new TraceContext(100, system.actorOf(UowTraceAggregator.props(reporter, 30 seconds))) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer } diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala index 24661445..4b5b66a9 100644 --- a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala +++ b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala @@ -1,5 +1,7 @@ package kamon +import org.slf4j.MDC + /** * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards. */ @@ -10,9 +12,11 @@ trait TraceContextSwap { def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = { ctx match { case Some(context) => { + MDC.put("uow", context.userContext.get.asInstanceOf[String]) Tracer.set(context) val bodyResult = primary Tracer.clear + MDC.remove("uow") bodyResult } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 84498cb8..4f0b8a08 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -2,13 +2,14 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{Props, ActorSystem, ActorRef} +import akka.actor.{ActorCell, Props, ActorSystem, ActorRef} import kamon.{Kamon, Tracer, TraceContext} import akka.dispatch.{MessageDispatcher, Envelope} import com.codahale.metrics.Timer import kamon.metric.{MetricDirectory, Metrics} import scala.Some import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage +import org.slf4j.MDC case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) @@ -52,11 +53,14 @@ class ActorCellInvokeInstrumentation { import ProceedingJoinPointPimp._ val (originalEnvelope, ctx) = instrumentation.preReceive(envelope) + //println("Test") ctx match { case Some(c) => { + //MDC.put("uow", c.userContext.get.asInstanceOf[String]) Tracer.set(c) pjp.proceedWith(originalEnvelope) Tracer.clear + //MDC.remove("uow") } case None => pjp.proceedWith(originalEnvelope) } @@ -74,9 +78,7 @@ class UnregisteredActorRefInstrumentation { import ProceedingJoinPointPimp._ println("Handling unregistered actor ref message: "+message) message match { - case TraceableMessage(ctx, msg, timer) => { - timer.stop() - + case SimpleTraceMessage(msg, ctx) => { ctx match { case Some(c) => { Tracer.set(c) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala new file mode 100644 index 00000000..6573549d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -0,0 +1,33 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation.{After, Pointcut, Aspect} +import kamon.Tracer +import kamon.trace.UowTracing.{Finish, Rename} +import spray.http.HttpRequest +import spray.can.server.OpenRequestComponent + +@Aspect +class SprayServerInstrumentation { + + @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && args(enclosing, request, closeAfterResponseCompletion, timestamp)") + def openRequestInit(enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {} + + @After("openRequestInit(enclosing, request, closeAfterResponseCompletion, timestamp)") + def afterInit(enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = { + //@After("openRequestInit()") + //def afterInit(): Unit = { + Tracer.start + println("Created the context: " + Tracer.context() + " for the transaction: " + request.uri.path.toString()) + Tracer.context().map(_.entries ! Rename(request.uri.path.toString())) + } + + @Pointcut("execution(* spray.can.server.OpenRequest.handleResponseEndAndReturnNextOpenRequest(..))") + def openRequestCreation(): Unit = {} + + @After("openRequestCreation()") + def afterFinishingRequest(): Unit = { + println("Finishing a request: " + Tracer.context()) + + Tracer.context().map(_.entries ! Finish()) + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala new file mode 100644 index 00000000..131ecba9 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala @@ -0,0 +1,21 @@ +package kamon.newrelic + +import akka.actor.Actor +import kamon.trace.UowTrace +import com.newrelic.api.agent.{Trace, NewRelic} + + +class NewRelicReporting extends Actor { + def receive = { + case trace: UowTrace => recordTransaction(trace) + } + + //@Trace + def recordTransaction(uowTrace: UowTrace): Unit = { + val time = (uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9 + + NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat) + NewRelic.recordMetric("WebTransaction", time.toFloat) + NewRelic.recordMetric("HttpDispatcher", time.toFloat) + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala new file mode 100644 index 00000000..b38d3d95 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala @@ -0,0 +1,45 @@ +package kamon.trace + +import akka.actor.{Props, ActorRef, Actor} +import kamon.trace.UowTracing.{Start, Finish, Rename} +import scala.concurrent.duration.Duration + +sealed trait UowSegment { + def timestamp: Long +} + +trait AutoTimestamp extends UowSegment { + val timestamp = System.nanoTime +} + +object UowTracing { + case class Start() extends AutoTimestamp + case class Finish() extends AutoTimestamp + case class Rename(name: String) extends AutoTimestamp +} + +case class UowTrace(name: String, segments: Seq[UowSegment]) + + +class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor { + context.setReceiveTimeout(aggregationTimeout) + self ! Start() + + var name: Option[String] = None + var segments: Seq[UowSegment] = Nil + + def receive = { + case finish: Finish => segments = segments :+ finish; finishTracing() + case Rename(newName) => name = Some(newName) + case segment: UowSegment => segments = segments :+ segment + } + + def finishTracing(): Unit = { + reporting ! UowTrace(name.getOrElse("UNKNOWN"), segments) + context.stop(self) + } +} + +object UowTraceAggregator { + def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala index 6ed17ec6..b78f1d79 100644 --- a/kamon-core/src/main/scala/test/PingPong.scala +++ b/kamon-core/src/main/scala/test/PingPong.scala @@ -1,37 +1,63 @@ package test import akka.actor.{Deploy, Props, Actor, ActorSystem} +import java.util.concurrent.atomic.AtomicLong +import kamon.Tracer +import spray.routing.SimpleRoutingApp object PingPong extends App { + import scala.concurrent.duration._ + val counter = new AtomicLong val as = ActorSystem("ping-pong") + import as.dispatcher - val pinger = as.actorOf(Props[Pinger]) - val ponger = as.actorOf(Props[Ponger]) + Tracer.start - pinger.tell(Pong, ponger) - - - Thread.sleep(30000) - as.shutdown() + for(i <- 1 to 64) { + val pinger = as.actorOf(Props[Pinger]) + val ponger = as.actorOf(Props[Ponger]) + for(_ <- 1 to 256) { + pinger.tell(Pong, ponger) + } + } + as.scheduler.schedule(1 second, 1 second) { + println("Processed: " + counter.getAndSet(0)) + } } case object Ping case object Pong class Pinger extends Actor { - val ponger = context.actorOf(Props[Ponger], "ponger#") - val ponger2 = context.actorOf(Props[Ponger], "ponger#") - def receive = { - case Pong => ponger ! Ping + case Pong => { + sender ! Ping + PingPong.counter.incrementAndGet() + } } } class Ponger extends Actor { def receive = { - case Ping => sender ! Pong + case Ping => { + sender ! Pong; PingPong.counter.incrementAndGet() + } } } + + +object SimpleRequestProcessor extends App with SimpleRoutingApp { + implicit val system = ActorSystem("test") + + startServer(interface = "localhost", port = 9090) { + get { + path("test"){ + complete("OK") + } + } + } + +} diff --git a/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala new file mode 100644 index 00000000..60b5f06d --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala @@ -0,0 +1,36 @@ +package kamon.trace + +import org.scalatest.{WordSpecLike, WordSpec} +import akka.testkit.{TestKitBase, TestKit} +import akka.actor.ActorSystem +import scala.concurrent.duration._ +import kamon.trace.UowTracing.{Finish, Rename, Start} + +class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike { + + "a TraceAggregator" should { + "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture { + within(1 second) { + aggregator ! Start() + aggregator ! Finish() + + expectMsg(UowTrace("UNKNOWN", Seq(Start(), Finish()))) + } + } + + "change the uow name after receiving a Rename message" in new AggregatorFixture { + within(1 second) { + aggregator ! Start() + aggregator ! Rename("test-uow") + aggregator ! Finish() + + expectMsg(UowTrace("test-uow", Seq(Start(), Finish()))) + } + } + } + + + trait AggregatorFixture { + val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds)) + } +} diff --git a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala index 6d38f3d7..0b54cedc 100644 --- a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala +++ b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala @@ -13,7 +13,7 @@ trait UowDirectives extends BasicDirectives { val generatedUow = uowHeader.map(_.value).orElse(Some(UowDirectives.newUow)) println("Generated UOW: "+generatedUow) - Tracer.set(Tracer.newTraceContext().copy(userContext = generatedUow)) + Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(userContext = generatedUow)) request diff --git a/project/Build.scala b/project/Build.scala index 0141540b..7fb935a2 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -19,7 +19,7 @@ object Build extends Build { .settings( libraryDependencies ++= - compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, aspectJWeaver, metrics, sprayJson) ++ + compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, aspectJWeaver, metrics, sprayJson, newrelic) ++ test(scalatest, akkaTestKit, sprayTestkit)) //.dependsOn(kamonDashboard) -- cgit v1.2.3