aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-08-12 19:00:49 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-08-12 19:00:49 -0300
commit18656c723881ebfd8ade43a990fe73beba5690d0 (patch)
tree34b563bc5f8913f054df15dad47a8d8e4b5b0e36
parent66957f2632eaccae4e3a354b8787fded8c6369d2 (diff)
downloadKamon-18656c723881ebfd8ade43a990fe73beba5690d0.tar.gz
Kamon-18656c723881ebfd8ade43a990fe73beba5690d0.tar.bz2
Kamon-18656c723881ebfd8ade43a990fe73beba5690d0.zip
fixed the instrumentation to work nicely with spray
-rw-r--r--kamon-core/src/main/resources/application.conf3
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala36
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContext.scala21
-rw-r--r--kamon-core/src/main/scala/kamon/executor/eventbus.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala49
-rw-r--r--kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala42
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala14
-rw-r--r--kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala4
-rw-r--r--kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala6
9 files changed, 104 insertions, 83 deletions
diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf
index fb69ecd4..06bdf13a 100644
--- a/kamon-core/src/main/resources/application.conf
+++ b/kamon-core/src/main/resources/application.conf
@@ -19,6 +19,9 @@ akka {
throughput = 100
}
+ debug {
+ unhandled = on
+ }
}
}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index c3080909..07773c55 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -8,33 +8,8 @@ import scala.concurrent.duration.{FiniteDuration, Duration}
import com.newrelic.api.agent.NewRelic
object Kamon {
-
- val ctx = new ThreadLocal[Option[TraceContext]] {
- override def initialValue() = None
- }
-
implicit lazy val actorSystem = ActorSystem("kamon")
-
- def context() = ctx.get()
- def clear = ctx.remove()
- def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
-
- def start = set(newTraceContext)
- def stop = ctx.get match {
- case Some(context) => context.close
- case None =>
- }
-
- def newTraceContext(): TraceContext = TraceContext()
-
-
- val publisher = actorSystem.actorOf(Props[TransactionPublisher])
-
- def publish(tx: FullTransaction) = publisher ! tx
-
-
-
object Metric {
val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala
@@ -44,21 +19,12 @@ object Kamon {
def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
}
-
-
val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")
}
-
-
-
-
-
-
-
object Tracer {
val ctx = new ThreadLocal[Option[TraceContext]] {
override def initialValue() = None
@@ -74,7 +40,7 @@ object Tracer {
case None =>
}
- //def newTraceContext(): TraceContext = TraceContext()
+ def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem)
}
diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala
index 6b32550f..62d7f57e 100644
--- a/kamon-core/src/main/scala/kamon/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/TraceContext.scala
@@ -1,31 +1,34 @@
package kamon
import java.util.UUID
-import akka.actor.{ActorSystem, ActorPath}
+import akka.actor._
import akka.agent.Agent
import java.util.concurrent.TimeUnit
import scala.util.{Failure, Success}
import akka.util.Timeout
-case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) {
+case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) {
implicit val timeout = Timeout(30, TimeUnit.SECONDS)
implicit val as = Kamon.actorSystem.dispatcher
- def append(entry: TraceEntry) = entries send (entry :: _)
- def close = entries.future.onComplete({
- case Success(list) => Kamon.publish(FullTransaction(id, list))
- case Failure(t) => println("WTF!")
- })
+ def append(entry: TraceEntry) = entries ! entry
+ def close = entries ! "Close" // TODO type this thing!.
}
object TraceContext {
- implicit val as2 = Kamon.actorSystem.dispatcher
- def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil))
+ def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer
}
+class TraceAccumulator extends Actor {
+ def receive = {
+ case a => println("Trace Accumulated: "+a)
+ }
+}
+
+
trait TraceEntry
case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry
diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
index 599f2a7a..33ff4a4e 100644
--- a/kamon-core/src/main/scala/kamon/executor/eventbus.scala
+++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
@@ -5,7 +5,7 @@ import akka.event.LookupClassification
import akka.actor._
import java.util.concurrent.TimeUnit
-import kamon.{CodeBlockExecutionTime, Kamon, TraceContext}
+import kamon.{Tracer, CodeBlockExecutionTime, Kamon, TraceContext}
import akka.util.Timeout
import scala.util.{Random, Success, Failure}
import scala.concurrent.Future
@@ -66,14 +66,14 @@ object TryAkka extends App{
}
}))
- Kamon.start
+ Tracer.start
for(i <- 1 to 4) {
val ping = system.actorOf(Props[PingActor])
ping ! Pong()
}
- def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body")
+ def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Tracer.context}] : $body")
/*
val newRelicReporter = new NewRelicReporter(registry)
@@ -86,13 +86,13 @@ object TryAkka extends App{
- Kamon.start
+ Tracer.start
- Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime()))
+ Tracer.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime()))
threadPrintln("Before doing it")
val f = Future { threadPrintln("This is happening inside the future body") }
- Kamon.stop
+ Tracer.stop
//Thread.sleep(3000)
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
index c543123c..f3e1828d 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -17,7 +17,7 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti
class ActorRefTellInstrumentation {
import ProceedingJoinPointPimp._
- @Pointcut("execution(* (akka.actor.ScalaActorRef+ && !akka.event.Logging$StandardOutLogger).$bang(..)) && target(actor) && args(message, sender)")
+ @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && target(actor) && args(message, sender)")
def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}
@Around("sendingMessageToActorRef(actor, message, sender)")
@@ -25,16 +25,8 @@ class ActorRefTellInstrumentation {
val actorName = MetricDirectory.nameForActor(actor)
val t = Metrics.registry.timer(actorName + "LATENCY")
- //println(s"About to proceed with: $actor $message $sender ${Kamon.context}")
- if(!actor.toString().contains("StandardOutLogger")) {
- println("Skipped the actor")
- pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender)
+ pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender)
- }
- else {
- println("Got the standardLogger!!")
- pjp.proceed()
- }
}
}
@@ -55,7 +47,7 @@ class ActorCellInvokeInstrumentation {
val actorName = MetricDirectory.nameForActor(ref)
val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
- println("=====> Created ActorCell for: "+ref.toString())
+ //println("=====> Created ActorCell for: "+ref.toString())
/** TODO: Find a better way to filter the things we don't want to measure. */
//if(system.name != "kamon" && actorName.startsWith("/user")) {
processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME")
@@ -64,14 +56,14 @@ class ActorCellInvokeInstrumentation {
}
- @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)")
+ @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
@Around("invokingActorBehaviourAtActorCell(envelope)")
def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
import ProceedingJoinPointPimp._
- println("ENVELOPE --------------------->"+envelope)
+ //println("ENVELOPE --------------------->"+envelope)
envelope match {
case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
timer.stop()
@@ -83,7 +75,36 @@ class ActorCellInvokeInstrumentation {
ctx match {
case Some(c) => {
Tracer.set(c)
- println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
+ //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
+ pjp.proceedWith(originalEnvelope)
+ Tracer.clear
+ }
+ case None => pjp.proceedWith(originalEnvelope)
+ }
+ pt.stop()
+ }
+ case _ => pjp.proceed
+ }
+ }
+
+
+ @Pointcut("execution(* spray.can.server.ResponseReceiverRef.handle(*)) && args(message)")
+ def sprayResponderHandle(message: AnyRef) = {}
+
+ @Around("sprayResponderHandle(message)")
+ def sprayInvokeAround(pjp: ProceedingJoinPoint, message: AnyRef): Unit = {
+ import ProceedingJoinPointPimp._
+ message match {
+ case TraceableMessage(ctx, msg, timer) => {
+ timer.stop()
+
+ val originalEnvelope: AnyRef = msg.asInstanceOf[AnyRef]
+
+ //println("PROCESSING TIME TIMER: "+processingTimeTimer)
+ val pt = processingTimeTimer.time()
+ ctx match {
+ case Some(c) => {
+ Tracer.set(c)
pjp.proceedWith(originalEnvelope)
Tracer.clear
}
diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
index 0026d953..ccc7740b 100644
--- a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
+++ b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
@@ -1,13 +1,18 @@
package akka.instrumentation
import org.scalatest.{WordSpecLike, Matchers}
-import akka.actor.{Actor, Props, ActorSystem}
+import akka.actor.{ActorRef, Actor, Props, ActorSystem}
import akka.testkit.{ImplicitSender, TestKit}
-import kamon.{TraceContext, Kamon}
+import kamon.{TraceContext, Tracer}
+import akka.pattern.{pipe, ask}
+import akka.util.Timeout
+import scala.concurrent.duration._
+import akka.routing.RoundRobinRouter
class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender {
+ implicit val executionContext = system.dispatcher
"an instrumented actor ref" when {
"used inside the context of a transaction" should {
@@ -17,28 +22,51 @@ class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentation
expectMsg(Some(testTraceContext))
}
- "propagate the trace context using tell" in {
+ "propagate the trace context using tell" in new TraceContextEchoFixture {
+ echo.tell("test", testActor)
+ expectMsg(Some(testTraceContext))
+ }
+
+ "propagate the trace context using ask" in new TraceContextEchoFixture {
+ implicit val timeout = Timeout(1 seconds)
+ (echo ? "test") pipeTo(testActor)
+
+ expectMsg(Some(testTraceContext))
}
- "propagate the trace context using ask" in {
+ "propagate the trace context to actors behind a rounter" in new RoutedTraceContextEchoFixture {
+ val contexts: Seq[Option[TraceContext]] = for(_ <- 1 to 10) yield Some(tellWithNewContext(echo, "test"))
+ expectMsgAllOf(contexts: _*)
}
}
}
trait TraceContextEchoFixture {
- val testTraceContext = Kamon.newTraceContext()
+ val testTraceContext = Tracer.newTraceContext()
val echo = system.actorOf(Props[TraceContextEcho])
- Kamon.set(testTraceContext)
+ Tracer.set(testTraceContext)
+ }
+
+ trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture {
+ override val echo = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 10)))
+
+ def tellWithNewContext(target: ActorRef, message: Any): TraceContext = {
+ val context = Tracer.newTraceContext()
+ Tracer.set(context)
+
+ target ! message
+ context
+ }
}
}
class TraceContextEcho extends Actor {
def receive = {
- case msg ⇒ sender ! Kamon.context()
+ case msg: String ⇒ sender ! Tracer.context()
}
}
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
index de65aaca..fe89695b 100644
--- a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
+++ b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
@@ -3,7 +3,7 @@ package kamon.instrumentation
import scala.concurrent.{Await, Promise, Future}
import org.scalatest.{Matchers, OptionValues, WordSpec}
import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration}
-import kamon.{Kamon, TraceContext}
+import kamon.{Tracer, Kamon, TraceContext}
import java.util.UUID
import scala.util.Success
import scala.concurrent.duration._
@@ -27,7 +27,7 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur
val onCompleteContext = Promise[TraceContext]()
futureWithContext.onComplete({
- case _ => onCompleteContext.complete(Success(Kamon.context.get))
+ case _ => onCompleteContext.complete(Success(Tracer.context.get))
})
whenReady(onCompleteContext.future) { result =>
@@ -49,7 +49,7 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur
val onCompleteContext = Promise[Option[TraceContext]]()
futureWithoutContext.onComplete({
- case _ => onCompleteContext.complete(Success(Kamon.context))
+ case _ => onCompleteContext.complete(Success(Tracer.context))
})
whenReady(onCompleteContext.future) { result =>
@@ -68,14 +68,14 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur
class FutureWithContextFixture {
val testContext = TraceContext()
- Kamon.set(testContext)
+ Tracer.set(testContext)
- val futureWithContext = Future { Kamon.context}
+ val futureWithContext = Future { Tracer.context}
}
trait FutureWithoutContextFixture {
- Kamon.clear // Make sure no TraceContext is available
- val futureWithoutContext = Future { Kamon.context }
+ Tracer.clear // Make sure no TraceContext is available
+ val futureWithoutContext = Future { Tracer.context }
}
}
diff --git a/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala b/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala
index e117db1b..2347bbe9 100644
--- a/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala
+++ b/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala
@@ -1,13 +1,13 @@
package kamon.logging
import akka.actor.Actor
-import kamon.Kamon
+import kamon.{Tracer, Kamon}
trait UowActorLogging {
self: Actor =>
def logWithUOW(text: String) = {
- val uow = Kamon.context.map(_.userContext).getOrElse("NA")
+ val uow = Tracer.context.map(_.userContext).getOrElse("NA")
println(s"=======>[$uow] - $text")
}
diff --git a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala
index e79602ea..58473f13 100644
--- a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala
+++ b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala
@@ -5,13 +5,13 @@ import spray.routing.Directive0
import spray.routing.directives.BasicDirectives
import java.net.InetAddress
import scala.util.Try
-import kamon.Kamon
+import kamon.{Tracer, Kamon}
trait UowDirectives extends BasicDirectives {
def uow: Directive0 = mapRequest { request =>
val generatedUow = Some(UowDirectives.newUow)
println("Generated UOW: "+generatedUow)
- Kamon.set(Kamon.newTraceContext().copy(userContext = generatedUow))
+ Tracer.set(Tracer.newTraceContext().copy(userContext = generatedUow))
request
@@ -21,7 +21,7 @@ trait UowDirectives extends BasicDirectives {
object UowDirectives {
val uowCounter = new AtomicLong
- val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName.toString).getOrElse("unknown-localhost")
+ val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet())