aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-09-18 18:43:11 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-09-18 18:43:11 -0300
commitbbf7afd85809f6d43b310290b4bb9102dd36043c (patch)
treee819812cafa5bc5c92065b003311866e7a231386
parent9382ce9d66b5d6bfef515cee56f40aa178920335 (diff)
downloadKamon-bbf7afd85809f6d43b310290b4bb9102dd36043c.tar.gz
Kamon-bbf7afd85809f6d43b310290b4bb9102dd36043c.tar.bz2
Kamon-bbf7afd85809f6d43b310290b4bb9102dd36043c.zip
basic newrelic reporting
-rw-r--r--kamon-core/src/main/resources/META-INF/aop.xml6
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContext.scala20
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContextSwap.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala10
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala33
-rw-r--r--kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala21
-rw-r--r--kamon-core/src/main/scala/kamon/trace/UowTracing.scala45
-rw-r--r--kamon-core/src/main/scala/test/PingPong.scala50
-rw-r--r--kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala36
-rw-r--r--kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala2
-rw-r--r--project/Build.scala2
11 files changed, 199 insertions, 30 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml
index 0f427611..10110300 100644
--- a/kamon-core/src/main/resources/META-INF/aop.xml
+++ b/kamon-core/src/main/resources/META-INF/aop.xml
@@ -11,16 +11,16 @@
<aspect name="kamon.instrumentation.ActorCellInvokeInstrumentation"/>
<aspect name="kamon.instrumentation.UnregisteredActorRefInstrumentation"/>
<aspect name="kamon.instrumentation.RunnableInstrumentation" />
- <aspect name="kamon.instrumentation.MessageQueueInstrumentation" />
+ <!--<aspect name="kamon.instrumentation.MessageQueueInstrumentation" />-->
- <aspect name="kamon.instrumentation.InceptionAspect"/>
+ <!--<aspect name="kamon.instrumentation.InceptionAspect"/>-->
<!-- ExecutorService Instrumentation for Akka. -->
<!-- <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/>
<aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/>-->
<aspect name="kamon.instrumentation.ActorSystemInstrumentation"/>
<!--<aspect name ="kamon.instrumentation.ForkJoinPoolInstrumentation"/>-->
-
+ <aspect name = "kamon.instrumentation.SprayServerInstrumentation"/>
<include within="*"/>
diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala
index 62d7f57e..a1476ae0 100644
--- a/kamon-core/src/main/scala/kamon/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/TraceContext.scala
@@ -2,14 +2,14 @@ package kamon
import java.util.UUID
import akka.actor._
-import akka.agent.Agent
-import java.util.concurrent.TimeUnit
-import scala.util.{Failure, Success}
-import akka.util.Timeout
-
-
-case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) {
- implicit val timeout = Timeout(30, TimeUnit.SECONDS)
+import java.util.concurrent.atomic.AtomicLong
+import kamon.trace.UowTraceAggregator
+import scala.concurrent.duration._
+import kamon.newrelic.NewRelicReporting
+
+// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary.
+case class TraceContext(id: Long, entries: ActorRef, userContext: Option[Any] = None) {
+ //implicit val timeout = Timeout(30, TimeUnit.SECONDS)
implicit val as = Kamon.actorSystem.dispatcher
def append(entry: TraceEntry) = entries ! entry
@@ -17,7 +17,9 @@ case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] =
}
object TraceContext {
- def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer
+ val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting])
+ val traceIdCounter = new AtomicLong
+ def apply()(implicit system: ActorSystem) = new TraceContext(100, system.actorOf(UowTraceAggregator.props(reporter, 30 seconds))) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer
}
diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
index 24661445..4b5b66a9 100644
--- a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
+++ b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
@@ -1,5 +1,7 @@
package kamon
+import org.slf4j.MDC
+
/**
* Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards.
*/
@@ -10,9 +12,11 @@ trait TraceContextSwap {
def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = {
ctx match {
case Some(context) => {
+ MDC.put("uow", context.userContext.get.asInstanceOf[String])
Tracer.set(context)
val bodyResult = primary
Tracer.clear
+ MDC.remove("uow")
bodyResult
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
index 84498cb8..4f0b8a08 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -2,13 +2,14 @@ package kamon.instrumentation
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
-import akka.actor.{Props, ActorSystem, ActorRef}
+import akka.actor.{ActorCell, Props, ActorSystem, ActorRef}
import kamon.{Kamon, Tracer, TraceContext}
import akka.dispatch.{MessageDispatcher, Envelope}
import com.codahale.metrics.Timer
import kamon.metric.{MetricDirectory, Metrics}
import scala.Some
import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage
+import org.slf4j.MDC
case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
@@ -52,11 +53,14 @@ class ActorCellInvokeInstrumentation {
import ProceedingJoinPointPimp._
val (originalEnvelope, ctx) = instrumentation.preReceive(envelope)
+ //println("Test")
ctx match {
case Some(c) => {
+ //MDC.put("uow", c.userContext.get.asInstanceOf[String])
Tracer.set(c)
pjp.proceedWith(originalEnvelope)
Tracer.clear
+ //MDC.remove("uow")
}
case None => pjp.proceedWith(originalEnvelope)
}
@@ -74,9 +78,7 @@ class UnregisteredActorRefInstrumentation {
import ProceedingJoinPointPimp._
println("Handling unregistered actor ref message: "+message)
message match {
- case TraceableMessage(ctx, msg, timer) => {
- timer.stop()
-
+ case SimpleTraceMessage(msg, ctx) => {
ctx match {
case Some(c) => {
Tracer.set(c)
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
new file mode 100644
index 00000000..6573549d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
@@ -0,0 +1,33 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation.{After, Pointcut, Aspect}
+import kamon.Tracer
+import kamon.trace.UowTracing.{Finish, Rename}
+import spray.http.HttpRequest
+import spray.can.server.OpenRequestComponent
+
+@Aspect
+class SprayServerInstrumentation {
+
+ @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && args(enclosing, request, closeAfterResponseCompletion, timestamp)")
+ def openRequestInit(enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {}
+
+ @After("openRequestInit(enclosing, request, closeAfterResponseCompletion, timestamp)")
+ def afterInit(enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {
+ //@After("openRequestInit()")
+ //def afterInit(): Unit = {
+ Tracer.start
+ println("Created the context: " + Tracer.context() + " for the transaction: " + request.uri.path.toString())
+ Tracer.context().map(_.entries ! Rename(request.uri.path.toString()))
+ }
+
+ @Pointcut("execution(* spray.can.server.OpenRequest.handleResponseEndAndReturnNextOpenRequest(..))")
+ def openRequestCreation(): Unit = {}
+
+ @After("openRequestCreation()")
+ def afterFinishingRequest(): Unit = {
+ println("Finishing a request: " + Tracer.context())
+
+ Tracer.context().map(_.entries ! Finish())
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
new file mode 100644
index 00000000..131ecba9
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
@@ -0,0 +1,21 @@
+package kamon.newrelic
+
+import akka.actor.Actor
+import kamon.trace.UowTrace
+import com.newrelic.api.agent.{Trace, NewRelic}
+
+
+class NewRelicReporting extends Actor {
+ def receive = {
+ case trace: UowTrace => recordTransaction(trace)
+ }
+
+ //@Trace
+ def recordTransaction(uowTrace: UowTrace): Unit = {
+ val time = (uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9
+
+ NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat)
+ NewRelic.recordMetric("WebTransaction", time.toFloat)
+ NewRelic.recordMetric("HttpDispatcher", time.toFloat)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
new file mode 100644
index 00000000..b38d3d95
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
@@ -0,0 +1,45 @@
+package kamon.trace
+
+import akka.actor.{Props, ActorRef, Actor}
+import kamon.trace.UowTracing.{Start, Finish, Rename}
+import scala.concurrent.duration.Duration
+
+sealed trait UowSegment {
+ def timestamp: Long
+}
+
+trait AutoTimestamp extends UowSegment {
+ val timestamp = System.nanoTime
+}
+
+object UowTracing {
+ case class Start() extends AutoTimestamp
+ case class Finish() extends AutoTimestamp
+ case class Rename(name: String) extends AutoTimestamp
+}
+
+case class UowTrace(name: String, segments: Seq[UowSegment])
+
+
+class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor {
+ context.setReceiveTimeout(aggregationTimeout)
+ self ! Start()
+
+ var name: Option[String] = None
+ var segments: Seq[UowSegment] = Nil
+
+ def receive = {
+ case finish: Finish => segments = segments :+ finish; finishTracing()
+ case Rename(newName) => name = Some(newName)
+ case segment: UowSegment => segments = segments :+ segment
+ }
+
+ def finishTracing(): Unit = {
+ reporting ! UowTrace(name.getOrElse("UNKNOWN"), segments)
+ context.stop(self)
+ }
+}
+
+object UowTraceAggregator {
+ def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala
index 6ed17ec6..b78f1d79 100644
--- a/kamon-core/src/main/scala/test/PingPong.scala
+++ b/kamon-core/src/main/scala/test/PingPong.scala
@@ -1,37 +1,63 @@
package test
import akka.actor.{Deploy, Props, Actor, ActorSystem}
+import java.util.concurrent.atomic.AtomicLong
+import kamon.Tracer
+import spray.routing.SimpleRoutingApp
object PingPong extends App {
+ import scala.concurrent.duration._
+ val counter = new AtomicLong
val as = ActorSystem("ping-pong")
+ import as.dispatcher
- val pinger = as.actorOf(Props[Pinger])
- val ponger = as.actorOf(Props[Ponger])
+ Tracer.start
- pinger.tell(Pong, ponger)
-
-
- Thread.sleep(30000)
- as.shutdown()
+ for(i <- 1 to 64) {
+ val pinger = as.actorOf(Props[Pinger])
+ val ponger = as.actorOf(Props[Ponger])
+ for(_ <- 1 to 256) {
+ pinger.tell(Pong, ponger)
+ }
+ }
+ as.scheduler.schedule(1 second, 1 second) {
+ println("Processed: " + counter.getAndSet(0))
+ }
}
case object Ping
case object Pong
class Pinger extends Actor {
- val ponger = context.actorOf(Props[Ponger], "ponger#")
- val ponger2 = context.actorOf(Props[Ponger], "ponger#")
-
def receive = {
- case Pong => ponger ! Ping
+ case Pong => {
+ sender ! Ping
+ PingPong.counter.incrementAndGet()
+ }
}
}
class Ponger extends Actor {
def receive = {
- case Ping => sender ! Pong
+ case Ping => {
+ sender ! Pong; PingPong.counter.incrementAndGet()
+ }
}
}
+
+
+object SimpleRequestProcessor extends App with SimpleRoutingApp {
+ implicit val system = ActorSystem("test")
+
+ startServer(interface = "localhost", port = 9090) {
+ get {
+ path("test"){
+ complete("OK")
+ }
+ }
+ }
+
+}
diff --git a/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala
new file mode 100644
index 00000000..60b5f06d
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala
@@ -0,0 +1,36 @@
+package kamon.trace
+
+import org.scalatest.{WordSpecLike, WordSpec}
+import akka.testkit.{TestKitBase, TestKit}
+import akka.actor.ActorSystem
+import scala.concurrent.duration._
+import kamon.trace.UowTracing.{Finish, Rename, Start}
+
+class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike {
+
+ "a TraceAggregator" should {
+ "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture {
+ within(1 second) {
+ aggregator ! Start()
+ aggregator ! Finish()
+
+ expectMsg(UowTrace("UNKNOWN", Seq(Start(), Finish())))
+ }
+ }
+
+ "change the uow name after receiving a Rename message" in new AggregatorFixture {
+ within(1 second) {
+ aggregator ! Start()
+ aggregator ! Rename("test-uow")
+ aggregator ! Finish()
+
+ expectMsg(UowTrace("test-uow", Seq(Start(), Finish())))
+ }
+ }
+ }
+
+
+ trait AggregatorFixture {
+ val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds))
+ }
+}
diff --git a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala
index 6d38f3d7..0b54cedc 100644
--- a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala
+++ b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala
@@ -13,7 +13,7 @@ trait UowDirectives extends BasicDirectives {
val generatedUow = uowHeader.map(_.value).orElse(Some(UowDirectives.newUow))
println("Generated UOW: "+generatedUow)
- Tracer.set(Tracer.newTraceContext().copy(userContext = generatedUow))
+ Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(userContext = generatedUow))
request
diff --git a/project/Build.scala b/project/Build.scala
index 0141540b..7fb935a2 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -19,7 +19,7 @@ object Build extends Build {
.settings(
libraryDependencies ++=
- compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, aspectJWeaver, metrics, sprayJson) ++
+ compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, aspectJWeaver, metrics, sprayJson, newrelic) ++
test(scalatest, akkaTestKit, sprayTestkit))
//.dependsOn(kamonDashboard)