From b2c85b020185c9d5bcc89cfe9d4166405d1b404a Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Thu, 8 Aug 2013 15:18:35 -0300 Subject: wip --- .../ActorRefTellInstrumentation.scala | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala') diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 82915ce9..c543123c 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -3,12 +3,11 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{Props, ActorSystem, ActorRef} -import kamon.{Kamon, TraceContext} +import kamon.{Tracer, TraceContext} import akka.dispatch.{MessageDispatcher, Envelope} import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram} import kamon.metric.{MetricDirectory, Metrics} import com.codahale.metrics -import kamon.instrumentation.TraceableMessage import scala.Some case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) @@ -18,7 +17,7 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti class ActorRefTellInstrumentation { import ProceedingJoinPointPimp._ - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)") + @Pointcut("execution(* (akka.actor.ScalaActorRef+ && !akka.event.Logging$StandardOutLogger).$bang(..)) && target(actor) && args(message, sender)") def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(actor, message, sender)") @@ -27,7 +26,15 @@ class ActorRefTellInstrumentation { val actorName = MetricDirectory.nameForActor(actor) val t = Metrics.registry.timer(actorName + "LATENCY") //println(s"About to proceed with: $actor $message $sender ${Kamon.context}") - pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender) + if(!actor.toString().contains("StandardOutLogger")) { + println("Skipped the actor") + pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender) + + } + else { + println("Got the standardLogger!!") + pjp.proceed() + } } } @@ -48,6 +55,7 @@ class ActorCellInvokeInstrumentation { val actorName = MetricDirectory.nameForActor(ref) val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) + println("=====> Created ActorCell for: "+ref.toString()) /** TODO: Find a better way to filter the things we don't want to measure. */ //if(system.name != "kamon" && actorName.startsWith("/user")) { processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") @@ -74,10 +82,10 @@ class ActorCellInvokeInstrumentation { val pt = processingTimeTimer.time() ctx match { case Some(c) => { - Kamon.set(c) + Tracer.set(c) println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) pjp.proceedWith(originalEnvelope) - Kamon.clear + Tracer.clear } case None => pjp.proceedWith(originalEnvelope) } -- cgit v1.2.3 From 36ca84c0505c65e7c4947d0b0a7edf12fcdec48e Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Mon, 12 Aug 2013 19:00:49 -0300 Subject: fixed the instrumentation to work nicely with spray --- kamon-core/src/main/resources/application.conf | 3 ++ kamon-core/src/main/scala/kamon/Kamon.scala | 36 +--------------- kamon-core/src/main/scala/kamon/TraceContext.scala | 21 ++++++---- .../src/main/scala/kamon/executor/eventbus.scala | 12 +++--- .../ActorRefTellInstrumentation.scala | 49 +++++++++++++++------- .../instrumentation/ActorInstrumentationSpec.scala | 42 +++++++++++++++---- .../RunnableInstrumentationSpec.scala | 14 +++---- 7 files changed, 99 insertions(+), 78 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala') diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf index fb69ecd4..06bdf13a 100644 --- a/kamon-core/src/main/resources/application.conf +++ b/kamon-core/src/main/resources/application.conf @@ -19,6 +19,9 @@ akka { throughput = 100 } + debug { + unhandled = on + } } } diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index c3080909..07773c55 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -8,33 +8,8 @@ import scala.concurrent.duration.{FiniteDuration, Duration} import com.newrelic.api.agent.NewRelic object Kamon { - - val ctx = new ThreadLocal[Option[TraceContext]] { - override def initialValue() = None - } - implicit lazy val actorSystem = ActorSystem("kamon") - - def context() = ctx.get() - def clear = ctx.remove() - def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) - - def start = set(newTraceContext) - def stop = ctx.get match { - case Some(context) => context.close - case None => - } - - def newTraceContext(): TraceContext = TraceContext() - - - val publisher = actorSystem.actorOf(Props[TransactionPublisher]) - - def publish(tx: FullTransaction) = publisher ! tx - - - object Metric { val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala @@ -44,21 +19,12 @@ object Kamon { def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) } - - val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") } - - - - - - - object Tracer { val ctx = new ThreadLocal[Option[TraceContext]] { override def initialValue() = None @@ -74,7 +40,7 @@ object Tracer { case None => } - //def newTraceContext(): TraceContext = TraceContext() + def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem) } diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala index 6b32550f..62d7f57e 100644 --- a/kamon-core/src/main/scala/kamon/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -1,31 +1,34 @@ package kamon import java.util.UUID -import akka.actor.{ActorSystem, ActorPath} +import akka.actor._ import akka.agent.Agent import java.util.concurrent.TimeUnit import scala.util.{Failure, Success} import akka.util.Timeout -case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) { +case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) { implicit val timeout = Timeout(30, TimeUnit.SECONDS) implicit val as = Kamon.actorSystem.dispatcher - def append(entry: TraceEntry) = entries send (entry :: _) - def close = entries.future.onComplete({ - case Success(list) => Kamon.publish(FullTransaction(id, list)) - case Failure(t) => println("WTF!") - }) + def append(entry: TraceEntry) = entries ! entry + def close = entries ! "Close" // TODO type this thing!. } object TraceContext { - implicit val as2 = Kamon.actorSystem.dispatcher - def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil)) + def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer } +class TraceAccumulator extends Actor { + def receive = { + case a => println("Trace Accumulated: "+a) + } +} + + trait TraceEntry case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala index 599f2a7a..33ff4a4e 100644 --- a/kamon-core/src/main/scala/kamon/executor/eventbus.scala +++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala @@ -5,7 +5,7 @@ import akka.event.LookupClassification import akka.actor._ import java.util.concurrent.TimeUnit -import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} +import kamon.{Tracer, CodeBlockExecutionTime, Kamon, TraceContext} import akka.util.Timeout import scala.util.{Random, Success, Failure} import scala.concurrent.Future @@ -66,14 +66,14 @@ object TryAkka extends App{ } })) - Kamon.start + Tracer.start for(i <- 1 to 4) { val ping = system.actorOf(Props[PingActor]) ping ! Pong() } - def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") + def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Tracer.context}] : $body") /* val newRelicReporter = new NewRelicReporter(registry) @@ -86,13 +86,13 @@ object TryAkka extends App{ - Kamon.start + Tracer.start - Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) + Tracer.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) threadPrintln("Before doing it") val f = Future { threadPrintln("This is happening inside the future body") } - Kamon.stop + Tracer.stop //Thread.sleep(3000) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index c543123c..f3e1828d 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -17,7 +17,7 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti class ActorRefTellInstrumentation { import ProceedingJoinPointPimp._ - @Pointcut("execution(* (akka.actor.ScalaActorRef+ && !akka.event.Logging$StandardOutLogger).$bang(..)) && target(actor) && args(message, sender)") + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && target(actor) && args(message, sender)") def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(actor, message, sender)") @@ -25,16 +25,8 @@ class ActorRefTellInstrumentation { val actorName = MetricDirectory.nameForActor(actor) val t = Metrics.registry.timer(actorName + "LATENCY") - //println(s"About to proceed with: $actor $message $sender ${Kamon.context}") - if(!actor.toString().contains("StandardOutLogger")) { - println("Skipped the actor") - pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender) + pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender) - } - else { - println("Got the standardLogger!!") - pjp.proceed() - } } } @@ -55,7 +47,7 @@ class ActorCellInvokeInstrumentation { val actorName = MetricDirectory.nameForActor(ref) val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) - println("=====> Created ActorCell for: "+ref.toString()) + //println("=====> Created ActorCell for: "+ref.toString()) /** TODO: Find a better way to filter the things we don't want to measure. */ //if(system.name != "kamon" && actorName.startsWith("/user")) { processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") @@ -64,14 +56,14 @@ class ActorCellInvokeInstrumentation { } - @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { import ProceedingJoinPointPimp._ - println("ENVELOPE --------------------->"+envelope) + //println("ENVELOPE --------------------->"+envelope) envelope match { case Envelope(TraceableMessage(ctx, msg, timer), sender) => { timer.stop() @@ -83,7 +75,36 @@ class ActorCellInvokeInstrumentation { ctx match { case Some(c) => { Tracer.set(c) - println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + pjp.proceedWith(originalEnvelope) + Tracer.clear + } + case None => pjp.proceedWith(originalEnvelope) + } + pt.stop() + } + case _ => pjp.proceed + } + } + + + @Pointcut("execution(* spray.can.server.ResponseReceiverRef.handle(*)) && args(message)") + def sprayResponderHandle(message: AnyRef) = {} + + @Around("sprayResponderHandle(message)") + def sprayInvokeAround(pjp: ProceedingJoinPoint, message: AnyRef): Unit = { + import ProceedingJoinPointPimp._ + message match { + case TraceableMessage(ctx, msg, timer) => { + timer.stop() + + val originalEnvelope: AnyRef = msg.asInstanceOf[AnyRef] + + //println("PROCESSING TIME TIMER: "+processingTimeTimer) + val pt = processingTimeTimer.time() + ctx match { + case Some(c) => { + Tracer.set(c) pjp.proceedWith(originalEnvelope) Tracer.clear } diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala index 0026d953..ccc7740b 100644 --- a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala @@ -1,13 +1,18 @@ package akka.instrumentation import org.scalatest.{WordSpecLike, Matchers} -import akka.actor.{Actor, Props, ActorSystem} +import akka.actor.{ActorRef, Actor, Props, ActorSystem} import akka.testkit.{ImplicitSender, TestKit} -import kamon.{TraceContext, Kamon} +import kamon.{TraceContext, Tracer} +import akka.pattern.{pipe, ask} +import akka.util.Timeout +import scala.concurrent.duration._ +import akka.routing.RoundRobinRouter class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender { + implicit val executionContext = system.dispatcher "an instrumented actor ref" when { "used inside the context of a transaction" should { @@ -17,28 +22,51 @@ class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentation expectMsg(Some(testTraceContext)) } - "propagate the trace context using tell" in { + "propagate the trace context using tell" in new TraceContextEchoFixture { + echo.tell("test", testActor) + expectMsg(Some(testTraceContext)) + } + + "propagate the trace context using ask" in new TraceContextEchoFixture { + implicit val timeout = Timeout(1 seconds) + (echo ? "test") pipeTo(testActor) + + expectMsg(Some(testTraceContext)) } - "propagate the trace context using ask" in { + "propagate the trace context to actors behind a rounter" in new RoutedTraceContextEchoFixture { + val contexts: Seq[Option[TraceContext]] = for(_ <- 1 to 10) yield Some(tellWithNewContext(echo, "test")) + expectMsgAllOf(contexts: _*) } } } trait TraceContextEchoFixture { - val testTraceContext = Kamon.newTraceContext() + val testTraceContext = Tracer.newTraceContext() val echo = system.actorOf(Props[TraceContextEcho]) - Kamon.set(testTraceContext) + Tracer.set(testTraceContext) + } + + trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture { + override val echo = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 10))) + + def tellWithNewContext(target: ActorRef, message: Any): TraceContext = { + val context = Tracer.newTraceContext() + Tracer.set(context) + + target ! message + context + } } } class TraceContextEcho extends Actor { def receive = { - case msg ⇒ sender ! Kamon.context() + case msg: String ⇒ sender ! Tracer.context() } } diff --git a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala index de65aaca..fe89695b 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala @@ -3,7 +3,7 @@ package kamon.instrumentation import scala.concurrent.{Await, Promise, Future} import org.scalatest.{Matchers, OptionValues, WordSpec} import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration} -import kamon.{Kamon, TraceContext} +import kamon.{Tracer, Kamon, TraceContext} import java.util.UUID import scala.util.Success import scala.concurrent.duration._ @@ -27,7 +27,7 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur val onCompleteContext = Promise[TraceContext]() futureWithContext.onComplete({ - case _ => onCompleteContext.complete(Success(Kamon.context.get)) + case _ => onCompleteContext.complete(Success(Tracer.context.get)) }) whenReady(onCompleteContext.future) { result => @@ -49,7 +49,7 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur val onCompleteContext = Promise[Option[TraceContext]]() futureWithoutContext.onComplete({ - case _ => onCompleteContext.complete(Success(Kamon.context)) + case _ => onCompleteContext.complete(Success(Tracer.context)) }) whenReady(onCompleteContext.future) { result => @@ -68,14 +68,14 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur class FutureWithContextFixture { val testContext = TraceContext() - Kamon.set(testContext) + Tracer.set(testContext) - val futureWithContext = Future { Kamon.context} + val futureWithContext = Future { Tracer.context} } trait FutureWithoutContextFixture { - Kamon.clear // Make sure no TraceContext is available - val futureWithoutContext = Future { Kamon.context } + Tracer.clear // Make sure no TraceContext is available + val futureWithoutContext = Future { Tracer.context } } } -- cgit v1.2.3 From e7a33a53b2eb3c25fea0cb6da2b54b84c94761bd Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Tue, 13 Aug 2013 18:50:19 -0300 Subject: wip --- kamon-core/src/main/resources/META-INF/aop.xml | 1 + kamon-core/src/main/resources/application.conf | 1 - .../ActorRefTellInstrumentation.scala | 24 +++++++++++----------- 3 files changed, 13 insertions(+), 13 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala') diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index e6d61fa1..0f1895ec 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -9,6 +9,7 @@ + diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf index 06bdf13a..1378e75e 100644 --- a/kamon-core/src/main/resources/application.conf +++ b/kamon-core/src/main/resources/application.conf @@ -1,7 +1,6 @@ akka { loglevel = DEBUG stdout-loglevel = DEBUG - log-dead-letters = on actor { default-dispatcher { fork-join-executor { diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index f3e1828d..36dd9d0b 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -17,7 +17,7 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti class ActorRefTellInstrumentation { import ProceedingJoinPointPimp._ - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && target(actor) && args(message, sender)") + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)") def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(actor, message, sender)") @@ -25,6 +25,7 @@ class ActorRefTellInstrumentation { val actorName = MetricDirectory.nameForActor(actor) val t = Metrics.registry.timer(actorName + "LATENCY") + //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]") pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender) } @@ -86,31 +87,30 @@ class ActorCellInvokeInstrumentation { case _ => pjp.proceed } } +} - @Pointcut("execution(* spray.can.server.ResponseReceiverRef.handle(*)) && args(message)") - def sprayResponderHandle(message: AnyRef) = {} +@Aspect +class UnregisteredActorRefInstrumentation { + @Pointcut("execution(* akka.spray.UnregisteredActorRefBase+.handle(..)) && args(message, sender)") + def sprayResponderHandle(message: Any, sender: ActorRef) = {} - @Around("sprayResponderHandle(message)") - def sprayInvokeAround(pjp: ProceedingJoinPoint, message: AnyRef): Unit = { + @Around("sprayResponderHandle(message, sender)") + def sprayInvokeAround(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { import ProceedingJoinPointPimp._ + println("Handling unregistered actor ref message: "+message) message match { case TraceableMessage(ctx, msg, timer) => { timer.stop() - val originalEnvelope: AnyRef = msg.asInstanceOf[AnyRef] - - //println("PROCESSING TIME TIMER: "+processingTimeTimer) - val pt = processingTimeTimer.time() ctx match { case Some(c) => { Tracer.set(c) - pjp.proceedWith(originalEnvelope) + pjp.proceedWith(msg.asInstanceOf[AnyRef]) // TODO: define if we should use Any or AnyRef and unify with the rest of the instrumentation. Tracer.clear } - case None => pjp.proceedWith(originalEnvelope) + case None => pjp.proceedWith(msg.asInstanceOf[AnyRef]) } - pt.stop() } case _ => pjp.proceed } -- cgit v1.2.3 From b678e3250576a0352bfc1d8c4ee5ee2a62b75715 Mon Sep 17 00:00:00 2001 From: Diego Parra Date: Fri, 23 Aug 2013 04:19:17 -0300 Subject: WIP:first implementation of Kamon Dashboard --- kamon-core/src/main/main.iml | 12 ++++++++++++ kamon-core/src/main/resources/application.conf | 3 +++ kamon-core/src/main/scala/kamon/Kamon.scala | 12 ++++++------ kamon-core/src/main/scala/kamon/executor/eventbus.scala | 6 +++--- .../kamon/instrumentation/ActorRefTellInstrumentation.scala | 3 +-- kamon-core/src/main/scala/kamon/metric/Metrics.scala | 6 +++--- kamon-core/src/test/test.iml | 12 ++++++++++++ 7 files changed, 40 insertions(+), 14 deletions(-) create mode 100644 kamon-core/src/main/main.iml create mode 100644 kamon-core/src/test/test.iml (limited to 'kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala') diff --git a/kamon-core/src/main/main.iml b/kamon-core/src/main/main.iml new file mode 100644 index 00000000..702d93f1 --- /dev/null +++ b/kamon-core/src/main/main.iml @@ -0,0 +1,12 @@ + + + + + + + + + + + + diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf index 1378e75e..2f8d8d87 100644 --- a/kamon-core/src/main/resources/application.conf +++ b/kamon-core/src/main/resources/application.conf @@ -1,6 +1,9 @@ akka { loglevel = DEBUG stdout-loglevel = DEBUG + + extensions = ["kamon.dashboard.DashboardExtension"] + actor { default-dispatcher { fork-join-executor { diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 07773c55..298f43eb 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -1,17 +1,17 @@ package kamon import akka.actor.{Actor, Props, ActorSystem} -import scala.collection.JavaConverters._ -import java.util.concurrent.ConcurrentHashMap -import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics} -import scala.concurrent.duration.{FiniteDuration, Duration} +import kamon.metric.{HistogramSnapshot, ActorSystemMetrics} +import scala.concurrent.duration.FiniteDuration import com.newrelic.api.agent.NewRelic +import scala.collection.concurrent.TrieMap object Kamon { implicit lazy val actorSystem = ActorSystem("kamon") object Metric { - val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala + + val actorSystems = TrieMap.empty[String, ActorSystemMetrics] def actorSystemNames: List[String] = actorSystems.keys.toList def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name)) @@ -34,7 +34,7 @@ object Tracer { def clear = ctx.remove() def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) - def start = ??? //set(newTraceContext) + def start = set(newTraceContext) def stop = ctx.get match { case Some(context) => context.close case None => diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala index 33ff4a4e..a1c099d4 100644 --- a/kamon-core/src/main/scala/kamon/executor/eventbus.scala +++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala @@ -36,7 +36,7 @@ case class Pong() class PingActor extends Actor with ActorLogging { - val pong = context.actorOf(Props[PongActor]) + val pong = context.actorOf(Props[PongActor], "Pong") val random = new Random() def receive = { case Pong() => { @@ -68,7 +68,7 @@ object TryAkka extends App{ Tracer.start for(i <- 1 to 4) { - val ping = system.actorOf(Props[PingActor]) + val ping = system.actorOf(Props[PingActor], "Ping" + i) ping ! Pong() } @@ -86,7 +86,7 @@ object TryAkka extends App{ - Tracer.start + //Tracer.start Tracer.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) threadPrintln("Before doing it") diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 36dd9d0b..212eab2c 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -5,9 +5,8 @@ import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{Props, ActorSystem, ActorRef} import kamon.{Tracer, TraceContext} import akka.dispatch.{MessageDispatcher, Envelope} -import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram} +import com.codahale.metrics.Timer import kamon.metric.{MetricDirectory, Metrics} -import com.codahale.metrics import scala.Some case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala index cdc0a334..edf532ae 100644 --- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala @@ -1,9 +1,10 @@ package kamon.metric -import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit} +import java.util.concurrent.TimeUnit import akka.actor.ActorRef import com.codahale.metrics import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry} +import scala.collection.concurrent.TrieMap object Metrics { @@ -85,8 +86,7 @@ trait HistogramSnapshot { case class ActorSystemMetrics(actorSystemName: String) { - import scala.collection.JavaConverters._ - val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala + val dispatchers = TrieMap.empty[String, DispatcherMetricCollector] private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram()) diff --git a/kamon-core/src/test/test.iml b/kamon-core/src/test/test.iml new file mode 100644 index 00000000..90381726 --- /dev/null +++ b/kamon-core/src/test/test.iml @@ -0,0 +1,12 @@ + + + + + + + + + + + + -- cgit v1.2.3 From d882e8e8ca6da081feac96f3d4eea197a9f84d12 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Mon, 26 Aug 2013 19:24:56 -0300 Subject: Some sort of basic logging with UOW --- kamon-core/src/main/resources/META-INF/aop.xml | 2 +- .../kamon/instrumentation/ActorRefTellInstrumentation.scala | 10 ++++++---- .../main/scala/kamon/instrumentation/MessageQueueMetrics.scala | 4 ++-- 3 files changed, 9 insertions(+), 7 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala') diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 0f1895ec..0f427611 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -19,7 +19,7 @@ - + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 212eab2c..7d3e36ca 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -16,16 +16,18 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti class ActorRefTellInstrumentation { import ProceedingJoinPointPimp._ + val t2 = Metrics.registry.timer("some" + "LATENCY") + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)") def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(actor, message, sender)") def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { - val actorName = MetricDirectory.nameForActor(actor) - val t = Metrics.registry.timer(actorName + "LATENCY") + //val actorName = MetricDirectory.nameForActor(actor) + //val t = Metrics.registry.timer(actorName + "LATENCY") //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]") - pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender) + pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender) } } @@ -66,7 +68,7 @@ class ActorCellInvokeInstrumentation { //println("ENVELOPE --------------------->"+envelope) envelope match { case Envelope(TraceableMessage(ctx, msg, timer), sender) => { - timer.stop() + //timer.stop() val originalEnvelope = envelope.copy(message = msg) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala index c21502ac..6a1e291f 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -48,12 +48,12 @@ class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: def enqueue(receiver: ActorRef, handle: Envelope) = { delegate.enqueue(receiver, handle) - queueSizeHistogram.update(numberOfMessages) + //queueSizeHistogram.update(numberOfMessages) } def dequeue(): Envelope = { val envelope = delegate.dequeue() - queueSizeHistogram.update(numberOfMessages) + //queueSizeHistogram.update(numberOfMessages) envelope } -- cgit v1.2.3