diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2013-08-26 22:25:08 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2013-08-26 22:25:08 -0300 |
commit | 72a16d4396f5f6dc745c2376d4e50475d4714004 (patch) | |
tree | 21213af0c9baae0e84bbe1326a8e8f9d494ca8e4 /kamon-core/src/main/scala/kamon | |
parent | d8a8a0cc0efb79697605efed1efbaf99b98921dd (diff) | |
parent | 30ee4d88346448066c5ae0f12b683343b678577f (diff) | |
download | Kamon-72a16d4396f5f6dc745c2376d4e50475d4714004.tar.gz Kamon-72a16d4396f5f6dc745c2376d4e50475d4714004.tar.bz2 Kamon-72a16d4396f5f6dc745c2376d4e50475d4714004.zip |
Merge branch 'master' of github.com:dpsoft/Kamon
Diffstat (limited to 'kamon-core/src/main/scala/kamon')
8 files changed, 81 insertions, 82 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index c3080909..298f43eb 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -1,42 +1,17 @@ package kamon import akka.actor.{Actor, Props, ActorSystem} -import scala.collection.JavaConverters._ -import java.util.concurrent.ConcurrentHashMap -import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics} -import scala.concurrent.duration.{FiniteDuration, Duration} +import kamon.metric.{HistogramSnapshot, ActorSystemMetrics} +import scala.concurrent.duration.FiniteDuration import com.newrelic.api.agent.NewRelic +import scala.collection.concurrent.TrieMap object Kamon { - - val ctx = new ThreadLocal[Option[TraceContext]] { - override def initialValue() = None - } - implicit lazy val actorSystem = ActorSystem("kamon") - - def context() = ctx.get() - def clear = ctx.remove() - def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) - - def start = set(newTraceContext) - def stop = ctx.get match { - case Some(context) => context.close - case None => - } - - def newTraceContext(): TraceContext = TraceContext() - - - val publisher = actorSystem.actorOf(Props[TransactionPublisher]) - - def publish(tx: FullTransaction) = publisher ! tx - - - object Metric { - val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala + + val actorSystems = TrieMap.empty[String, ActorSystemMetrics] def actorSystemNames: List[String] = actorSystems.keys.toList def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name)) @@ -44,21 +19,12 @@ object Kamon { def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) } - - val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") } - - - - - - - object Tracer { val ctx = new ThreadLocal[Option[TraceContext]] { override def initialValue() = None @@ -68,13 +34,13 @@ object Tracer { def clear = ctx.remove() def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) - def start = ??? //set(newTraceContext) + def start = set(newTraceContext) def stop = ctx.get match { case Some(context) => context.close case None => } - //def newTraceContext(): TraceContext = TraceContext() + def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem) } diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala index 6b32550f..62d7f57e 100644 --- a/kamon-core/src/main/scala/kamon/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -1,31 +1,34 @@ package kamon import java.util.UUID -import akka.actor.{ActorSystem, ActorPath} +import akka.actor._ import akka.agent.Agent import java.util.concurrent.TimeUnit import scala.util.{Failure, Success} import akka.util.Timeout -case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) { +case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) { implicit val timeout = Timeout(30, TimeUnit.SECONDS) implicit val as = Kamon.actorSystem.dispatcher - def append(entry: TraceEntry) = entries send (entry :: _) - def close = entries.future.onComplete({ - case Success(list) => Kamon.publish(FullTransaction(id, list)) - case Failure(t) => println("WTF!") - }) + def append(entry: TraceEntry) = entries ! entry + def close = entries ! "Close" // TODO type this thing!. } object TraceContext { - implicit val as2 = Kamon.actorSystem.dispatcher - def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil)) + def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer } +class TraceAccumulator extends Actor { + def receive = { + case a => println("Trace Accumulated: "+a) + } +} + + trait TraceEntry case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala index 68ee808b..24661445 100644 --- a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala +++ b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala @@ -10,9 +10,9 @@ trait TraceContextSwap { def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = { ctx match { case Some(context) => { - Kamon.set(context) + Tracer.set(context) val bodyResult = primary - Kamon.clear + Tracer.clear bodyResult } diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala index 599f2a7a..a1c099d4 100644 --- a/kamon-core/src/main/scala/kamon/executor/eventbus.scala +++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala @@ -5,7 +5,7 @@ import akka.event.LookupClassification import akka.actor._ import java.util.concurrent.TimeUnit -import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} +import kamon.{Tracer, CodeBlockExecutionTime, Kamon, TraceContext} import akka.util.Timeout import scala.util.{Random, Success, Failure} import scala.concurrent.Future @@ -36,7 +36,7 @@ case class Pong() class PingActor extends Actor with ActorLogging { - val pong = context.actorOf(Props[PongActor]) + val pong = context.actorOf(Props[PongActor], "Pong") val random = new Random() def receive = { case Pong() => { @@ -66,14 +66,14 @@ object TryAkka extends App{ } })) - Kamon.start + Tracer.start for(i <- 1 to 4) { - val ping = system.actorOf(Props[PingActor]) + val ping = system.actorOf(Props[PingActor], "Ping" + i) ping ! Pong() } - def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") + def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Tracer.context}] : $body") /* val newRelicReporter = new NewRelicReporter(registry) @@ -86,13 +86,13 @@ object TryAkka extends App{ - Kamon.start + //Tracer.start - Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) + Tracer.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) threadPrintln("Before doing it") val f = Future { threadPrintln("This is happening inside the future body") } - Kamon.stop + Tracer.stop //Thread.sleep(3000) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 82915ce9..7d3e36ca 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -3,12 +3,10 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{Props, ActorSystem, ActorRef} -import kamon.{Kamon, TraceContext} +import kamon.{Tracer, TraceContext} import akka.dispatch.{MessageDispatcher, Envelope} -import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram} +import com.codahale.metrics.Timer import kamon.metric.{MetricDirectory, Metrics} -import com.codahale.metrics -import kamon.instrumentation.TraceableMessage import scala.Some case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) @@ -18,16 +16,19 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti class ActorRefTellInstrumentation { import ProceedingJoinPointPimp._ - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)") + val t2 = Metrics.registry.timer("some" + "LATENCY") + + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)") def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(actor, message, sender)") def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { - val actorName = MetricDirectory.nameForActor(actor) - val t = Metrics.registry.timer(actorName + "LATENCY") - //println(s"About to proceed with: $actor $message $sender ${Kamon.context}") - pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender) + //val actorName = MetricDirectory.nameForActor(actor) + //val t = Metrics.registry.timer(actorName + "LATENCY") + //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]") + pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender) + } } @@ -48,6 +49,7 @@ class ActorCellInvokeInstrumentation { val actorName = MetricDirectory.nameForActor(ref) val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) + //println("=====> Created ActorCell for: "+ref.toString()) /** TODO: Find a better way to filter the things we don't want to measure. */ //if(system.name != "kamon" && actorName.startsWith("/user")) { processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") @@ -56,17 +58,17 @@ class ActorCellInvokeInstrumentation { } - @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { import ProceedingJoinPointPimp._ - println("ENVELOPE --------------------->"+envelope) + //println("ENVELOPE --------------------->"+envelope) envelope match { case Envelope(TraceableMessage(ctx, msg, timer), sender) => { - timer.stop() + //timer.stop() val originalEnvelope = envelope.copy(message = msg) @@ -74,10 +76,10 @@ class ActorCellInvokeInstrumentation { val pt = processingTimeTimer.time() ctx match { case Some(c) => { - Kamon.set(c) - println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + Tracer.set(c) + //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) pjp.proceedWith(originalEnvelope) - Kamon.clear + Tracer.clear } case None => pjp.proceedWith(originalEnvelope) } @@ -87,3 +89,31 @@ class ActorCellInvokeInstrumentation { } } } + + +@Aspect +class UnregisteredActorRefInstrumentation { + @Pointcut("execution(* akka.spray.UnregisteredActorRefBase+.handle(..)) && args(message, sender)") + def sprayResponderHandle(message: Any, sender: ActorRef) = {} + + @Around("sprayResponderHandle(message, sender)") + def sprayInvokeAround(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { + import ProceedingJoinPointPimp._ + println("Handling unregistered actor ref message: "+message) + message match { + case TraceableMessage(ctx, msg, timer) => { + timer.stop() + + ctx match { + case Some(c) => { + Tracer.set(c) + pjp.proceedWith(msg.asInstanceOf[AnyRef]) // TODO: define if we should use Any or AnyRef and unify with the rest of the instrumentation. + Tracer.clear + } + case None => pjp.proceedWith(msg.asInstanceOf[AnyRef]) + } + } + case _ => pjp.proceed + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala index c21502ac..6a1e291f 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -48,12 +48,12 @@ class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: def enqueue(receiver: ActorRef, handle: Envelope) = { delegate.enqueue(receiver, handle) - queueSizeHistogram.update(numberOfMessages) + //queueSizeHistogram.update(numberOfMessages) } def dequeue(): Envelope = { val envelope = delegate.dequeue() - queueSizeHistogram.update(numberOfMessages) + //queueSizeHistogram.update(numberOfMessages) envelope } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala index e75a638f..30041321 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -1,7 +1,7 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ -import kamon.{Kamon, TraceContext} +import kamon.{Tracer, TraceContext} import org.aspectj.lang.ProceedingJoinPoint import scala.Some @@ -37,7 +37,7 @@ class RunnableInstrumentation { * Aspect members */ - private val traceContext = Kamon.context + private val traceContext = Tracer.context /** diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala index cdc0a334..edf532ae 100644 --- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala @@ -1,9 +1,10 @@ package kamon.metric -import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit} +import java.util.concurrent.TimeUnit import akka.actor.ActorRef import com.codahale.metrics import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry} +import scala.collection.concurrent.TrieMap object Metrics { @@ -85,8 +86,7 @@ trait HistogramSnapshot { case class ActorSystemMetrics(actorSystemName: String) { - import scala.collection.JavaConverters._ - val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala + val dispatchers = TrieMap.empty[String, DispatcherMetricCollector] private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram()) |