aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-08-12 19:00:49 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-08-12 19:00:49 -0300
commit18656c723881ebfd8ade43a990fe73beba5690d0 (patch)
tree34b563bc5f8913f054df15dad47a8d8e4b5b0e36 /kamon-core/src/main/scala
parent66957f2632eaccae4e3a354b8787fded8c6369d2 (diff)
downloadKamon-18656c723881ebfd8ade43a990fe73beba5690d0.tar.gz
Kamon-18656c723881ebfd8ade43a990fe73beba5690d0.tar.bz2
Kamon-18656c723881ebfd8ade43a990fe73beba5690d0.zip
fixed the instrumentation to work nicely with spray
Diffstat (limited to 'kamon-core/src/main/scala')
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala36
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContext.scala21
-rw-r--r--kamon-core/src/main/scala/kamon/executor/eventbus.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala49
4 files changed, 54 insertions, 64 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index c3080909..07773c55 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -8,33 +8,8 @@ import scala.concurrent.duration.{FiniteDuration, Duration}
import com.newrelic.api.agent.NewRelic
object Kamon {
-
- val ctx = new ThreadLocal[Option[TraceContext]] {
- override def initialValue() = None
- }
-
implicit lazy val actorSystem = ActorSystem("kamon")
-
- def context() = ctx.get()
- def clear = ctx.remove()
- def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
-
- def start = set(newTraceContext)
- def stop = ctx.get match {
- case Some(context) => context.close
- case None =>
- }
-
- def newTraceContext(): TraceContext = TraceContext()
-
-
- val publisher = actorSystem.actorOf(Props[TransactionPublisher])
-
- def publish(tx: FullTransaction) = publisher ! tx
-
-
-
object Metric {
val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala
@@ -44,21 +19,12 @@ object Kamon {
def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
}
-
-
val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")
}
-
-
-
-
-
-
-
object Tracer {
val ctx = new ThreadLocal[Option[TraceContext]] {
override def initialValue() = None
@@ -74,7 +40,7 @@ object Tracer {
case None =>
}
- //def newTraceContext(): TraceContext = TraceContext()
+ def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem)
}
diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala
index 6b32550f..62d7f57e 100644
--- a/kamon-core/src/main/scala/kamon/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/TraceContext.scala
@@ -1,31 +1,34 @@
package kamon
import java.util.UUID
-import akka.actor.{ActorSystem, ActorPath}
+import akka.actor._
import akka.agent.Agent
import java.util.concurrent.TimeUnit
import scala.util.{Failure, Success}
import akka.util.Timeout
-case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) {
+case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) {
implicit val timeout = Timeout(30, TimeUnit.SECONDS)
implicit val as = Kamon.actorSystem.dispatcher
- def append(entry: TraceEntry) = entries send (entry :: _)
- def close = entries.future.onComplete({
- case Success(list) => Kamon.publish(FullTransaction(id, list))
- case Failure(t) => println("WTF!")
- })
+ def append(entry: TraceEntry) = entries ! entry
+ def close = entries ! "Close" // TODO type this thing!.
}
object TraceContext {
- implicit val as2 = Kamon.actorSystem.dispatcher
- def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil))
+ def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer
}
+class TraceAccumulator extends Actor {
+ def receive = {
+ case a => println("Trace Accumulated: "+a)
+ }
+}
+
+
trait TraceEntry
case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry
diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
index 599f2a7a..33ff4a4e 100644
--- a/kamon-core/src/main/scala/kamon/executor/eventbus.scala
+++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
@@ -5,7 +5,7 @@ import akka.event.LookupClassification
import akka.actor._
import java.util.concurrent.TimeUnit
-import kamon.{CodeBlockExecutionTime, Kamon, TraceContext}
+import kamon.{Tracer, CodeBlockExecutionTime, Kamon, TraceContext}
import akka.util.Timeout
import scala.util.{Random, Success, Failure}
import scala.concurrent.Future
@@ -66,14 +66,14 @@ object TryAkka extends App{
}
}))
- Kamon.start
+ Tracer.start
for(i <- 1 to 4) {
val ping = system.actorOf(Props[PingActor])
ping ! Pong()
}
- def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body")
+ def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Tracer.context}] : $body")
/*
val newRelicReporter = new NewRelicReporter(registry)
@@ -86,13 +86,13 @@ object TryAkka extends App{
- Kamon.start
+ Tracer.start
- Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime()))
+ Tracer.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime()))
threadPrintln("Before doing it")
val f = Future { threadPrintln("This is happening inside the future body") }
- Kamon.stop
+ Tracer.stop
//Thread.sleep(3000)
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
index c543123c..f3e1828d 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -17,7 +17,7 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti
class ActorRefTellInstrumentation {
import ProceedingJoinPointPimp._
- @Pointcut("execution(* (akka.actor.ScalaActorRef+ && !akka.event.Logging$StandardOutLogger).$bang(..)) && target(actor) && args(message, sender)")
+ @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && target(actor) && args(message, sender)")
def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}
@Around("sendingMessageToActorRef(actor, message, sender)")
@@ -25,16 +25,8 @@ class ActorRefTellInstrumentation {
val actorName = MetricDirectory.nameForActor(actor)
val t = Metrics.registry.timer(actorName + "LATENCY")
- //println(s"About to proceed with: $actor $message $sender ${Kamon.context}")
- if(!actor.toString().contains("StandardOutLogger")) {
- println("Skipped the actor")
- pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender)
+ pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender)
- }
- else {
- println("Got the standardLogger!!")
- pjp.proceed()
- }
}
}
@@ -55,7 +47,7 @@ class ActorCellInvokeInstrumentation {
val actorName = MetricDirectory.nameForActor(ref)
val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
- println("=====> Created ActorCell for: "+ref.toString())
+ //println("=====> Created ActorCell for: "+ref.toString())
/** TODO: Find a better way to filter the things we don't want to measure. */
//if(system.name != "kamon" && actorName.startsWith("/user")) {
processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME")
@@ -64,14 +56,14 @@ class ActorCellInvokeInstrumentation {
}
- @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)")
+ @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
@Around("invokingActorBehaviourAtActorCell(envelope)")
def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
import ProceedingJoinPointPimp._
- println("ENVELOPE --------------------->"+envelope)
+ //println("ENVELOPE --------------------->"+envelope)
envelope match {
case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
timer.stop()
@@ -83,7 +75,36 @@ class ActorCellInvokeInstrumentation {
ctx match {
case Some(c) => {
Tracer.set(c)
- println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
+ //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
+ pjp.proceedWith(originalEnvelope)
+ Tracer.clear
+ }
+ case None => pjp.proceedWith(originalEnvelope)
+ }
+ pt.stop()
+ }
+ case _ => pjp.proceed
+ }
+ }
+
+
+ @Pointcut("execution(* spray.can.server.ResponseReceiverRef.handle(*)) && args(message)")
+ def sprayResponderHandle(message: AnyRef) = {}
+
+ @Around("sprayResponderHandle(message)")
+ def sprayInvokeAround(pjp: ProceedingJoinPoint, message: AnyRef): Unit = {
+ import ProceedingJoinPointPimp._
+ message match {
+ case TraceableMessage(ctx, msg, timer) => {
+ timer.stop()
+
+ val originalEnvelope: AnyRef = msg.asInstanceOf[AnyRef]
+
+ //println("PROCESSING TIME TIMER: "+processingTimeTimer)
+ val pt = processingTimeTimer.time()
+ ctx match {
+ case Some(c) => {
+ Tracer.set(c)
pjp.proceedWith(originalEnvelope)
Tracer.clear
}