aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-09-18 18:43:11 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-09-18 18:43:11 -0300
commitbbf7afd85809f6d43b310290b4bb9102dd36043c (patch)
treee819812cafa5bc5c92065b003311866e7a231386 /kamon-core/src/main/scala/kamon
parent9382ce9d66b5d6bfef515cee56f40aa178920335 (diff)
downloadKamon-bbf7afd85809f6d43b310290b4bb9102dd36043c.tar.gz
Kamon-bbf7afd85809f6d43b310290b4bb9102dd36043c.tar.bz2
Kamon-bbf7afd85809f6d43b310290b4bb9102dd36043c.zip
basic newrelic reporting
Diffstat (limited to 'kamon-core/src/main/scala/kamon')
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContext.scala20
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContextSwap.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala10
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala33
-rw-r--r--kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala21
-rw-r--r--kamon-core/src/main/scala/kamon/trace/UowTracing.scala45
6 files changed, 120 insertions, 13 deletions
diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala
index 62d7f57e..a1476ae0 100644
--- a/kamon-core/src/main/scala/kamon/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/TraceContext.scala
@@ -2,14 +2,14 @@ package kamon
import java.util.UUID
import akka.actor._
-import akka.agent.Agent
-import java.util.concurrent.TimeUnit
-import scala.util.{Failure, Success}
-import akka.util.Timeout
-
-
-case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) {
- implicit val timeout = Timeout(30, TimeUnit.SECONDS)
+import java.util.concurrent.atomic.AtomicLong
+import kamon.trace.UowTraceAggregator
+import scala.concurrent.duration._
+import kamon.newrelic.NewRelicReporting
+
+// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary.
+case class TraceContext(id: Long, entries: ActorRef, userContext: Option[Any] = None) {
+ //implicit val timeout = Timeout(30, TimeUnit.SECONDS)
implicit val as = Kamon.actorSystem.dispatcher
def append(entry: TraceEntry) = entries ! entry
@@ -17,7 +17,9 @@ case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] =
}
object TraceContext {
- def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer
+ val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting])
+ val traceIdCounter = new AtomicLong
+ def apply()(implicit system: ActorSystem) = new TraceContext(100, system.actorOf(UowTraceAggregator.props(reporter, 30 seconds))) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer
}
diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
index 24661445..4b5b66a9 100644
--- a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
+++ b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
@@ -1,5 +1,7 @@
package kamon
+import org.slf4j.MDC
+
/**
* Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards.
*/
@@ -10,9 +12,11 @@ trait TraceContextSwap {
def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = {
ctx match {
case Some(context) => {
+ MDC.put("uow", context.userContext.get.asInstanceOf[String])
Tracer.set(context)
val bodyResult = primary
Tracer.clear
+ MDC.remove("uow")
bodyResult
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
index 84498cb8..4f0b8a08 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -2,13 +2,14 @@ package kamon.instrumentation
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
-import akka.actor.{Props, ActorSystem, ActorRef}
+import akka.actor.{ActorCell, Props, ActorSystem, ActorRef}
import kamon.{Kamon, Tracer, TraceContext}
import akka.dispatch.{MessageDispatcher, Envelope}
import com.codahale.metrics.Timer
import kamon.metric.{MetricDirectory, Metrics}
import scala.Some
import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage
+import org.slf4j.MDC
case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
@@ -52,11 +53,14 @@ class ActorCellInvokeInstrumentation {
import ProceedingJoinPointPimp._
val (originalEnvelope, ctx) = instrumentation.preReceive(envelope)
+ //println("Test")
ctx match {
case Some(c) => {
+ //MDC.put("uow", c.userContext.get.asInstanceOf[String])
Tracer.set(c)
pjp.proceedWith(originalEnvelope)
Tracer.clear
+ //MDC.remove("uow")
}
case None => pjp.proceedWith(originalEnvelope)
}
@@ -74,9 +78,7 @@ class UnregisteredActorRefInstrumentation {
import ProceedingJoinPointPimp._
println("Handling unregistered actor ref message: "+message)
message match {
- case TraceableMessage(ctx, msg, timer) => {
- timer.stop()
-
+ case SimpleTraceMessage(msg, ctx) => {
ctx match {
case Some(c) => {
Tracer.set(c)
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
new file mode 100644
index 00000000..6573549d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
@@ -0,0 +1,33 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation.{After, Pointcut, Aspect}
+import kamon.Tracer
+import kamon.trace.UowTracing.{Finish, Rename}
+import spray.http.HttpRequest
+import spray.can.server.OpenRequestComponent
+
+@Aspect
+class SprayServerInstrumentation {
+
+ @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && args(enclosing, request, closeAfterResponseCompletion, timestamp)")
+ def openRequestInit(enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {}
+
+ @After("openRequestInit(enclosing, request, closeAfterResponseCompletion, timestamp)")
+ def afterInit(enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {
+ //@After("openRequestInit()")
+ //def afterInit(): Unit = {
+ Tracer.start
+ println("Created the context: " + Tracer.context() + " for the transaction: " + request.uri.path.toString())
+ Tracer.context().map(_.entries ! Rename(request.uri.path.toString()))
+ }
+
+ @Pointcut("execution(* spray.can.server.OpenRequest.handleResponseEndAndReturnNextOpenRequest(..))")
+ def openRequestCreation(): Unit = {}
+
+ @After("openRequestCreation()")
+ def afterFinishingRequest(): Unit = {
+ println("Finishing a request: " + Tracer.context())
+
+ Tracer.context().map(_.entries ! Finish())
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
new file mode 100644
index 00000000..131ecba9
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
@@ -0,0 +1,21 @@
+package kamon.newrelic
+
+import akka.actor.Actor
+import kamon.trace.UowTrace
+import com.newrelic.api.agent.{Trace, NewRelic}
+
+
+class NewRelicReporting extends Actor {
+ def receive = {
+ case trace: UowTrace => recordTransaction(trace)
+ }
+
+ //@Trace
+ def recordTransaction(uowTrace: UowTrace): Unit = {
+ val time = (uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9
+
+ NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat)
+ NewRelic.recordMetric("WebTransaction", time.toFloat)
+ NewRelic.recordMetric("HttpDispatcher", time.toFloat)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
new file mode 100644
index 00000000..b38d3d95
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
@@ -0,0 +1,45 @@
+package kamon.trace
+
+import akka.actor.{Props, ActorRef, Actor}
+import kamon.trace.UowTracing.{Start, Finish, Rename}
+import scala.concurrent.duration.Duration
+
+sealed trait UowSegment {
+ def timestamp: Long
+}
+
+trait AutoTimestamp extends UowSegment {
+ val timestamp = System.nanoTime
+}
+
+object UowTracing {
+ case class Start() extends AutoTimestamp
+ case class Finish() extends AutoTimestamp
+ case class Rename(name: String) extends AutoTimestamp
+}
+
+case class UowTrace(name: String, segments: Seq[UowSegment])
+
+
+class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor {
+ context.setReceiveTimeout(aggregationTimeout)
+ self ! Start()
+
+ var name: Option[String] = None
+ var segments: Seq[UowSegment] = Nil
+
+ def receive = {
+ case finish: Finish => segments = segments :+ finish; finishTracing()
+ case Rename(newName) => name = Some(newName)
+ case segment: UowSegment => segments = segments :+ segment
+ }
+
+ def finishTracing(): Unit = {
+ reporting ! UowTrace(name.getOrElse("UNKNOWN"), segments)
+ context.stop(self)
+ }
+}
+
+object UowTraceAggregator {
+ def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout)
+} \ No newline at end of file