aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-09-30 11:21:25 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-09-30 11:21:25 -0300
commitd7f1e195da11f977d5fdbf74598e499156de8dc4 (patch)
tree58b6650bcc01c38f0cfb6fb32bbf4921d4c4b79f /kamon-core
parent604d5801332838f8bea25fe25cb8df5dbb82af08 (diff)
downloadKamon-d7f1e195da11f977d5fdbf74598e499156de8dc4.tar.gz
Kamon-d7f1e195da11f977d5fdbf74598e499156de8dc4.tar.bz2
Kamon-d7f1e195da11f977d5fdbf74598e499156de8dc4.zip
wip
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/resources/application.conf6
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala14
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala8
-rw-r--r--kamon-core/src/main/scala/test/PingPong.scala31
-rw-r--r--kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala22
6 files changed, 78 insertions, 15 deletions
diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf
index d51f6b15..c87c0ced 100644
--- a/kamon-core/src/main/resources/application.conf
+++ b/kamon-core/src/main/resources/application.conf
@@ -1,7 +1,7 @@
akka {
- loglevel = DEBUG
- stdout-loglevel = DEBUG
- log-dead-letters = on
+ loglevel = INFO
+ stdout-loglevel = INFO
+ log-dead-letters = off
#extensions = ["kamon.dashboard.DashboardExtension"]
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
index d92d7f6c..df124f41 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -64,8 +64,11 @@ class ActorCellInvokeInstrumentation {
Tracer.clear
//MDC.remove("uow")
}
- case None => pjp.proceedWith(originalEnvelope)
+ case None =>
+ assert(Tracer.context() == None)
+ pjp.proceedWith(originalEnvelope)
}
+ Tracer.clear
}
}
@@ -79,6 +82,7 @@ class UnregisteredActorRefInstrumentation {
def sprayInvokeAround(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = {
import ProceedingJoinPointPimp._
//println("Handling unregistered actor ref message: "+message)
+
message match {
case SimpleTraceMessage(msg, ctx) => {
ctx match {
@@ -87,10 +91,14 @@ class UnregisteredActorRefInstrumentation {
pjp.proceedWith(msg.asInstanceOf[AnyRef]) // TODO: define if we should use Any or AnyRef and unify with the rest of the instrumentation.
Tracer.clear
}
- case None => pjp.proceedWith(msg.asInstanceOf[AnyRef])
+ case None =>
+ assert(Tracer.context() == None)
+ pjp.proceedWith(msg.asInstanceOf[AnyRef])
}
}
- case _ => pjp.proceed
+ case _ =>
+ //assert(Tracer.context() == None)
+ pjp.proceed
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
index 30041321..393293f1 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
@@ -37,7 +37,7 @@ class RunnableInstrumentation {
* Aspect members
*/
- private val traceContext = Tracer.context
+ private var traceContext = Tracer.context
/**
@@ -47,15 +47,21 @@ class RunnableInstrumentation {
@Before("instrumentedRunnableCreation()")
def beforeCreation = {
- //println((new Throwable).getStackTraceString)
+ traceContext = Tracer.context
+ /* if(traceContext.isEmpty)
+ println("NO TRACE CONTEXT FOR RUNNABLE at: [[[%s]]]", (new Throwable).getStackTraceString)//println((new Throwable).getStackTraceString)
+ else
+ println("SUPER TRACE CONTEXT FOR RUNNABLE at: [[[%s]]]", (new Throwable).getStackTraceString)*/
}
@Around("runnableExecution()")
def around(pjp: ProceedingJoinPoint) = {
import pjp._
-
+ /*if(traceContext.isEmpty)
+ println("OOHHH NOOOOO")*/
withContext(traceContext, proceed())
}
}
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
index 9422a9f7..4eafcebe 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
@@ -32,7 +32,7 @@ class SprayServerInstrumentation {
//def afterInit(): Unit = {
Tracer.start
//openRequest.traceContext
- println("Created the context: " + Tracer.context() + " for the transaction: " + request)
+ //println("Created the context: " + Tracer.context() + " for the transaction: " + request)
Tracer.context().map(_.entries ! Rename(request.uri.path.toString()))
}
@@ -41,13 +41,13 @@ class SprayServerInstrumentation {
@After("openRequestCreation()")
def afterFinishingRequest(): Unit = {
- println("Finishing a request: " + Tracer.context())
+ //println("Finishing a request: " + Tracer.context())
Tracer.context().map(_.entries ! Finish())
-
+/*
if(Tracer.context().isEmpty) {
println("WOOOOOPAAAAAAAAA")
- }
+ }*/
}
diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala
index 93aa322d..c942c6ab 100644
--- a/kamon-core/src/main/scala/test/PingPong.scala
+++ b/kamon-core/src/main/scala/test/PingPong.scala
@@ -1,11 +1,12 @@
package test
-import akka.actor.{Deploy, Props, Actor, ActorSystem}
+import akka.actor._
import java.util.concurrent.atomic.AtomicLong
import kamon.Tracer
import spray.routing.SimpleRoutingApp
import akka.util.Timeout
import spray.httpx.RequestBuilding
+import scala.concurrent.Future
object PingPong extends App {
import scala.concurrent.duration._
@@ -54,6 +55,7 @@ class Ponger extends Actor {
object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding {
import scala.concurrent.duration._
import spray.client.pipelining._
+ import akka.pattern.ask
implicit val system = ActorSystem("test")
import system.dispatcher
@@ -61,6 +63,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
implicit val timeout = Timeout(30 seconds)
val pipeline = sendReceive
+ val replier = system.actorOf(Props[Replier])
startServer(interface = "localhost", port = 9090) {
get {
@@ -68,8 +71,34 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
complete {
pipeline(Get("http://www.despegar.com.ar")).map(r => "Ok")
}
+ } ~
+ path("reply") {
+ complete {
+ if (Tracer.context().isEmpty)
+ println("ROUTE NO CONTEXT")
+
+ (replier ? "replytome").mapTo[String]
+ }
+ } ~
+ path("ok") {
+ complete("ok")
+ } ~
+ path("future") {
+ dynamic {
+ complete(Future { "OK" })
+ }
}
}
}
}
+
+class Replier extends Actor with ActorLogging {
+ def receive = {
+ case _ =>
+ if(Tracer.context.isEmpty)
+ log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT")
+
+ sender ! "Ok"
+ }
+}
diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
index ccc7740b..454b4514 100644
--- a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
+++ b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
@@ -8,6 +8,7 @@ import kamon.{TraceContext, Tracer}
import akka.pattern.{pipe, ask}
import akka.util.Timeout
import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
import akka.routing.RoundRobinRouter
@@ -35,11 +36,30 @@ class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentation
expectMsg(Some(testTraceContext))
}
- "propagate the trace context to actors behind a rounter" in new RoutedTraceContextEchoFixture {
+ "propagate the trace context to actors behind a router" in new RoutedTraceContextEchoFixture {
val contexts: Seq[Option[TraceContext]] = for(_ <- 1 to 10) yield Some(tellWithNewContext(echo, "test"))
expectMsgAllOf(contexts: _*)
}
+
+ "propagate with many asks" in {
+ val echo = system.actorOf(Props[TraceContextEcho])
+ val iterations = 50000
+ implicit val timeout = Timeout(10 seconds)
+
+ val futures = for(_ <- 1 to iterations) yield {
+ Tracer.start
+ val result = (echo ? "test")
+ Tracer.clear
+
+ result
+ }
+
+ val allResults = Await.result(Future.sequence(futures), 10 seconds)
+ assert(iterations == allResults.collect {
+ case Some(_) => 1
+ }.sum)
+ }
}
}