aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2013-08-26 22:25:08 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2013-08-26 22:25:08 -0300
commit79f2577ea160b8da6b865454ffa8622bba02f352 (patch)
treeab67f2c4850e38c493cd712029b26fb5c52cde4b /kamon-core/src/main/scala/kamon
parent34ec35a74d3ebbc2a520be9b64f39ddd6ce7e5e1 (diff)
parent2ac1427cee74f573a208c8983378f7c678a7621c (diff)
downloadKamon-79f2577ea160b8da6b865454ffa8622bba02f352.tar.gz
Kamon-79f2577ea160b8da6b865454ffa8622bba02f352.tar.bz2
Kamon-79f2577ea160b8da6b865454ffa8622bba02f352.zip
Merge branch 'master' of github.com:dpsoft/Kamon
Diffstat (limited to 'kamon-core/src/main/scala/kamon')
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala48
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContext.scala21
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContextSwap.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/executor/eventbus.scala16
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala60
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Metrics.scala6
8 files changed, 81 insertions, 82 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index c3080909..298f43eb 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -1,42 +1,17 @@
package kamon
import akka.actor.{Actor, Props, ActorSystem}
-import scala.collection.JavaConverters._
-import java.util.concurrent.ConcurrentHashMap
-import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics}
-import scala.concurrent.duration.{FiniteDuration, Duration}
+import kamon.metric.{HistogramSnapshot, ActorSystemMetrics}
+import scala.concurrent.duration.FiniteDuration
import com.newrelic.api.agent.NewRelic
+import scala.collection.concurrent.TrieMap
object Kamon {
-
- val ctx = new ThreadLocal[Option[TraceContext]] {
- override def initialValue() = None
- }
-
implicit lazy val actorSystem = ActorSystem("kamon")
-
- def context() = ctx.get()
- def clear = ctx.remove()
- def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
-
- def start = set(newTraceContext)
- def stop = ctx.get match {
- case Some(context) => context.close
- case None =>
- }
-
- def newTraceContext(): TraceContext = TraceContext()
-
-
- val publisher = actorSystem.actorOf(Props[TransactionPublisher])
-
- def publish(tx: FullTransaction) = publisher ! tx
-
-
-
object Metric {
- val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala
+
+ val actorSystems = TrieMap.empty[String, ActorSystemMetrics]
def actorSystemNames: List[String] = actorSystems.keys.toList
def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name))
@@ -44,21 +19,12 @@ object Kamon {
def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
}
-
-
val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")
}
-
-
-
-
-
-
-
object Tracer {
val ctx = new ThreadLocal[Option[TraceContext]] {
override def initialValue() = None
@@ -68,13 +34,13 @@ object Tracer {
def clear = ctx.remove()
def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
- def start = ??? //set(newTraceContext)
+ def start = set(newTraceContext)
def stop = ctx.get match {
case Some(context) => context.close
case None =>
}
- //def newTraceContext(): TraceContext = TraceContext()
+ def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem)
}
diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala
index 6b32550f..62d7f57e 100644
--- a/kamon-core/src/main/scala/kamon/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/TraceContext.scala
@@ -1,31 +1,34 @@
package kamon
import java.util.UUID
-import akka.actor.{ActorSystem, ActorPath}
+import akka.actor._
import akka.agent.Agent
import java.util.concurrent.TimeUnit
import scala.util.{Failure, Success}
import akka.util.Timeout
-case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) {
+case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) {
implicit val timeout = Timeout(30, TimeUnit.SECONDS)
implicit val as = Kamon.actorSystem.dispatcher
- def append(entry: TraceEntry) = entries send (entry :: _)
- def close = entries.future.onComplete({
- case Success(list) => Kamon.publish(FullTransaction(id, list))
- case Failure(t) => println("WTF!")
- })
+ def append(entry: TraceEntry) = entries ! entry
+ def close = entries ! "Close" // TODO type this thing!.
}
object TraceContext {
- implicit val as2 = Kamon.actorSystem.dispatcher
- def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil))
+ def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer
}
+class TraceAccumulator extends Actor {
+ def receive = {
+ case a => println("Trace Accumulated: "+a)
+ }
+}
+
+
trait TraceEntry
case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry
diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
index 68ee808b..24661445 100644
--- a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
+++ b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
@@ -10,9 +10,9 @@ trait TraceContextSwap {
def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = {
ctx match {
case Some(context) => {
- Kamon.set(context)
+ Tracer.set(context)
val bodyResult = primary
- Kamon.clear
+ Tracer.clear
bodyResult
}
diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
index 599f2a7a..a1c099d4 100644
--- a/kamon-core/src/main/scala/kamon/executor/eventbus.scala
+++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
@@ -5,7 +5,7 @@ import akka.event.LookupClassification
import akka.actor._
import java.util.concurrent.TimeUnit
-import kamon.{CodeBlockExecutionTime, Kamon, TraceContext}
+import kamon.{Tracer, CodeBlockExecutionTime, Kamon, TraceContext}
import akka.util.Timeout
import scala.util.{Random, Success, Failure}
import scala.concurrent.Future
@@ -36,7 +36,7 @@ case class Pong()
class PingActor extends Actor with ActorLogging {
- val pong = context.actorOf(Props[PongActor])
+ val pong = context.actorOf(Props[PongActor], "Pong")
val random = new Random()
def receive = {
case Pong() => {
@@ -66,14 +66,14 @@ object TryAkka extends App{
}
}))
- Kamon.start
+ Tracer.start
for(i <- 1 to 4) {
- val ping = system.actorOf(Props[PingActor])
+ val ping = system.actorOf(Props[PingActor], "Ping" + i)
ping ! Pong()
}
- def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body")
+ def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Tracer.context}] : $body")
/*
val newRelicReporter = new NewRelicReporter(registry)
@@ -86,13 +86,13 @@ object TryAkka extends App{
- Kamon.start
+ //Tracer.start
- Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime()))
+ Tracer.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime()))
threadPrintln("Before doing it")
val f = Future { threadPrintln("This is happening inside the future body") }
- Kamon.stop
+ Tracer.stop
//Thread.sleep(3000)
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
index 82915ce9..7d3e36ca 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -3,12 +3,10 @@ package kamon.instrumentation
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
import akka.actor.{Props, ActorSystem, ActorRef}
-import kamon.{Kamon, TraceContext}
+import kamon.{Tracer, TraceContext}
import akka.dispatch.{MessageDispatcher, Envelope}
-import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram}
+import com.codahale.metrics.Timer
import kamon.metric.{MetricDirectory, Metrics}
-import com.codahale.metrics
-import kamon.instrumentation.TraceableMessage
import scala.Some
case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
@@ -18,16 +16,19 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti
class ActorRefTellInstrumentation {
import ProceedingJoinPointPimp._
- @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)")
+ val t2 = Metrics.registry.timer("some" + "LATENCY")
+
+ @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)")
def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}
@Around("sendingMessageToActorRef(actor, message, sender)")
def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = {
- val actorName = MetricDirectory.nameForActor(actor)
- val t = Metrics.registry.timer(actorName + "LATENCY")
- //println(s"About to proceed with: $actor $message $sender ${Kamon.context}")
- pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender)
+ //val actorName = MetricDirectory.nameForActor(actor)
+ //val t = Metrics.registry.timer(actorName + "LATENCY")
+ //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]")
+ pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender)
+
}
}
@@ -48,6 +49,7 @@ class ActorCellInvokeInstrumentation {
val actorName = MetricDirectory.nameForActor(ref)
val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
+ //println("=====> Created ActorCell for: "+ref.toString())
/** TODO: Find a better way to filter the things we don't want to measure. */
//if(system.name != "kamon" && actorName.startsWith("/user")) {
processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME")
@@ -56,17 +58,17 @@ class ActorCellInvokeInstrumentation {
}
- @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)")
+ @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
@Around("invokingActorBehaviourAtActorCell(envelope)")
def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
import ProceedingJoinPointPimp._
- println("ENVELOPE --------------------->"+envelope)
+ //println("ENVELOPE --------------------->"+envelope)
envelope match {
case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
- timer.stop()
+ //timer.stop()
val originalEnvelope = envelope.copy(message = msg)
@@ -74,10 +76,10 @@ class ActorCellInvokeInstrumentation {
val pt = processingTimeTimer.time()
ctx match {
case Some(c) => {
- Kamon.set(c)
- println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
+ Tracer.set(c)
+ //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
pjp.proceedWith(originalEnvelope)
- Kamon.clear
+ Tracer.clear
}
case None => pjp.proceedWith(originalEnvelope)
}
@@ -87,3 +89,31 @@ class ActorCellInvokeInstrumentation {
}
}
}
+
+
+@Aspect
+class UnregisteredActorRefInstrumentation {
+ @Pointcut("execution(* akka.spray.UnregisteredActorRefBase+.handle(..)) && args(message, sender)")
+ def sprayResponderHandle(message: Any, sender: ActorRef) = {}
+
+ @Around("sprayResponderHandle(message, sender)")
+ def sprayInvokeAround(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = {
+ import ProceedingJoinPointPimp._
+ println("Handling unregistered actor ref message: "+message)
+ message match {
+ case TraceableMessage(ctx, msg, timer) => {
+ timer.stop()
+
+ ctx match {
+ case Some(c) => {
+ Tracer.set(c)
+ pjp.proceedWith(msg.asInstanceOf[AnyRef]) // TODO: define if we should use Any or AnyRef and unify with the rest of the instrumentation.
+ Tracer.clear
+ }
+ case None => pjp.proceedWith(msg.asInstanceOf[AnyRef])
+ }
+ }
+ case _ => pjp.proceed
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
index c21502ac..6a1e291f 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
@@ -48,12 +48,12 @@ class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram:
def enqueue(receiver: ActorRef, handle: Envelope) = {
delegate.enqueue(receiver, handle)
- queueSizeHistogram.update(numberOfMessages)
+ //queueSizeHistogram.update(numberOfMessages)
}
def dequeue(): Envelope = {
val envelope = delegate.dequeue()
- queueSizeHistogram.update(numberOfMessages)
+ //queueSizeHistogram.update(numberOfMessages)
envelope
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
index e75a638f..30041321 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
@@ -1,7 +1,7 @@
package kamon.instrumentation
import org.aspectj.lang.annotation._
-import kamon.{Kamon, TraceContext}
+import kamon.{Tracer, TraceContext}
import org.aspectj.lang.ProceedingJoinPoint
import scala.Some
@@ -37,7 +37,7 @@ class RunnableInstrumentation {
* Aspect members
*/
- private val traceContext = Kamon.context
+ private val traceContext = Tracer.context
/**
diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
index cdc0a334..edf532ae 100644
--- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
@@ -1,9 +1,10 @@
package kamon.metric
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit}
+import java.util.concurrent.TimeUnit
import akka.actor.ActorRef
import com.codahale.metrics
import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry}
+import scala.collection.concurrent.TrieMap
object Metrics {
@@ -85,8 +86,7 @@ trait HistogramSnapshot {
case class ActorSystemMetrics(actorSystemName: String) {
- import scala.collection.JavaConverters._
- val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala
+ val dispatchers = TrieMap.empty[String, DispatcherMetricCollector]
private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram())