aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2013-08-08 01:04:27 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2013-08-08 01:04:27 -0300
commitd8a8a0cc0efb79697605efed1efbaf99b98921dd (patch)
treed555199f0c63b690ec51805b496ee2d54eb014da
parent695b9b6d2bdf55afd7fe420d9a6fc36d3d45ed31 (diff)
parent923b88e8adef2f66b43e551fa4a0a1bbae5af7ff (diff)
downloadKamon-d8a8a0cc0efb79697605efed1efbaf99b98921dd.tar.gz
Kamon-d8a8a0cc0efb79697605efed1efbaf99b98921dd.tar.bz2
Kamon-d8a8a0cc0efb79697605efed1efbaf99b98921dd.zip
Merge branch 'master' of github.com:dpsoft/Kamon
-rw-r--r--.gitignore1
-rw-r--r--README.md7
-rw-r--r--kamon-core/src/main/resources/META-INF/aop.xml34
-rw-r--r--kamon-core/src/main/resources/application.conf (renamed from src/main/resources/application.conf)0
-rw-r--r--kamon-core/src/main/resources/newrelic.yml (renamed from src/main/resources/newrelic.yml)4
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala132
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContext.scala (renamed from src/main/scala/kamon/TraceContext.scala)37
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContextSwap.scala (renamed from src/main/scala/kamon/TraceContextSwap.scala)0
-rw-r--r--kamon-core/src/main/scala/kamon/TransactionPublisher.scala (renamed from src/main/scala/kamon/TransactionPublisher.scala)0
-rw-r--r--kamon-core/src/main/scala/kamon/executor/eventbus.scala (renamed from src/main/scala/kamon/executor/eventbus.scala)33
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala89
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala23
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala245
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala73
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala (renamed from src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala)6
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala49
-rw-r--r--kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala67
-rw-r--r--kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricFilter.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Metrics.scala146
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala (renamed from src/main/scala/kamon/metric/MetricsUtils.scala)0
-rw-r--r--kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala (renamed from src/main/scala/kamon/metric/NewRelicReporter.scala)26
-rw-r--r--kamon-core/src/main/scala/spraytest/ClientTest.scala (renamed from src/main/scala/spraytest/ClientTest.scala)4
-rw-r--r--kamon-core/src/main/scala/spraytest/FutureTesting.scala (renamed from src/main/scala/spraytest/FutureTesting.scala)2
-rw-r--r--kamon-core/src/main/scala/test/PingPong.scala34
-rw-r--r--kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala45
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala22
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala34
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala53
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala (renamed from src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala)15
-rw-r--r--kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala14
-rw-r--r--kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala28
-rw-r--r--project/Build.scala17
-rw-r--r--project/Dependencies.scala24
-rw-r--r--project/NewRelic.scala2
-rw-r--r--project/Settings.scala4
-rw-r--r--project/plugins.sbt1
-rw-r--r--src/main/resources/META-INF/aop.xml27
-rw-r--r--src/main/scala/akka/ActorInstrumentation.scala46
-rw-r--r--src/main/scala/akka/ActorSystemAspect.scala18
-rw-r--r--src/main/scala/akka/MailboxAspect.scala16
-rw-r--r--src/main/scala/akka/MailboxMetrics.scala35
-rw-r--r--src/main/scala/akka/PoolMetrics.scala29
-rw-r--r--src/main/scala/akka/PoolMonitorInstrumentation.scala16
-rw-r--r--src/main/scala/akka/Tracer.scala24
-rw-r--r--src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala57
-rw-r--r--src/main/scala/kamon/Kamon.scala31
-rw-r--r--src/main/scala/kamon/metric/Metrics.scala14
-rw-r--r--src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala42
-rw-r--r--src/test/scala/kamon/instrumentation/ScalaFutures.scala32
50 files changed, 1236 insertions, 440 deletions
diff --git a/.gitignore b/.gitignore
index ffb18ce4..c185efd9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,7 @@
*.class
*.log
.history
+*.sc
# sbt specific
dist/*
diff --git a/README.md b/README.md
index 84ea6326..93386982 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,9 @@
Kamon
=====
+
+
+
+
+/metrics/actorsystem/{actorsystem-name}/dispatcher/{dispatcher-name}/
+For each dispatcher, show:
+ - \ No newline at end of file
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml
new file mode 100644
index 00000000..e6d61fa1
--- /dev/null
+++ b/kamon-core/src/main/resources/META-INF/aop.xml
@@ -0,0 +1,34 @@
+<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
+
+<aspectj>
+ <weaver options="-verbose -showWeaveInfo">
+ <!--<dump within="*" beforeandafter="true"/>-->
+ </weaver>
+
+ <aspects>
+
+ <aspect name="kamon.instrumentation.ActorRefTellInstrumentation"/>
+ <aspect name="kamon.instrumentation.ActorCellInvokeInstrumentation"/>
+ <aspect name="kamon.instrumentation.RunnableInstrumentation" />
+ <aspect name="kamon.instrumentation.MessageQueueInstrumentation" />
+
+ <aspect name="kamon.instrumentation.InceptionAspect"/>
+
+ <!-- ExecutorService Instrumentation for Akka. -->
+<!-- <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/>
+ <aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/>-->
+ <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/>
+ <aspect name ="kamon.instrumentation.ForkJoinPoolInstrumentation"/>
+
+
+
+ <include within="*"/>
+ <exclude within="javax..*"/>
+ <exclude within="org.aspectj..*"/>
+ <exclude within="scala..*"/>
+ <exclude within="scalaz..*"/>
+ <exclude within="scalad..*"/>
+ <exclude within="play..*"/>
+ </aspects>
+
+</aspectj>
diff --git a/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf
index 370acae9..370acae9 100644
--- a/src/main/resources/application.conf
+++ b/kamon-core/src/main/resources/application.conf
diff --git a/src/main/resources/newrelic.yml b/kamon-core/src/main/resources/newrelic.yml
index e347635e..1b1ad53b 100644
--- a/src/main/resources/newrelic.yml
+++ b/kamon-core/src/main/resources/newrelic.yml
@@ -48,7 +48,7 @@ common: &default_settings
# This setting is dynamic, so changes do not require restarting your application.
# The levels in increasing order of verboseness are: off, severe, warning, info, fine, finer, finest
# Default is info.
- log_level: info
+ log_level: finest
enable_custom_tracing: true
# Log all data to and from New Relic in plain text.
@@ -70,7 +70,7 @@ common: &default_settings
# The log file directory.
# Default is the logs directory in the newrelic.jar parent directory.
- #log_file_path:
+ log_file_path: /home/ivantopo/Desktop/tmp
# The agent communicates with New Relic via https by
# default. If you want to communicate with newrelic via http,
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
new file mode 100644
index 00000000..c3080909
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -0,0 +1,132 @@
+package kamon
+
+import akka.actor.{Actor, Props, ActorSystem}
+import scala.collection.JavaConverters._
+import java.util.concurrent.ConcurrentHashMap
+import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics}
+import scala.concurrent.duration.{FiniteDuration, Duration}
+import com.newrelic.api.agent.NewRelic
+
+object Kamon {
+
+ val ctx = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue() = None
+ }
+
+ implicit lazy val actorSystem = ActorSystem("kamon")
+
+
+ def context() = ctx.get()
+ def clear = ctx.remove()
+ def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
+
+ def start = set(newTraceContext)
+ def stop = ctx.get match {
+ case Some(context) => context.close
+ case None =>
+ }
+
+ def newTraceContext(): TraceContext = TraceContext()
+
+
+ val publisher = actorSystem.actorOf(Props[TransactionPublisher])
+
+ def publish(tx: FullTransaction) = publisher ! tx
+
+
+
+ object Metric {
+ val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala
+
+ def actorSystemNames: List[String] = actorSystems.keys.toList
+ def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name))
+
+ def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
+ }
+
+
+
+ val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
+ val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")
+
+}
+
+
+
+
+
+
+
+
+
+object Tracer {
+ val ctx = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue() = None
+ }
+
+ def context() = ctx.get()
+ def clear = ctx.remove()
+ def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
+
+ def start = ??? //set(newTraceContext)
+ def stop = ctx.get match {
+ case Some(context) => context.close
+ case None =>
+ }
+
+ //def newTraceContext(): TraceContext = TraceContext()
+}
+
+
+class MetricManager extends Actor {
+ implicit val ec = context.system.dispatcher
+
+ def receive = {
+ case RegisterForAllDispatchers(frequency) => {
+ val subscriber = sender
+ context.system.scheduler.schedule(frequency, frequency) {
+ Kamon.Metric.actorSystems.foreach {
+ case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach {
+ case (dispatcherName, dispatcherMetrics) => {
+ val activeThreads = dispatcherMetrics.activeThreadCount.snapshot
+ val poolSize = dispatcherMetrics.poolSize.snapshot
+ val queueSize = dispatcherMetrics.queueSize.snapshot
+
+ subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize)
+
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+case class RegisterForAllDispatchers(frequency: FiniteDuration)
+case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot)
+
+
+
+
+
+
+class NewrelicReporterActor extends Actor {
+ import scala.concurrent.duration._
+
+ Kamon.metricManager ! RegisterForAllDispatchers(5 seconds)
+
+ def receive = {
+ case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => {
+ /*println("PUBLISHED DISPATCHER STATS")
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat)
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat))
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat)*/
+
+
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat)
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat))
+
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat)
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala
index 19ebc578..6b32550f 100644
--- a/src/main/scala/kamon/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/TraceContext.scala
@@ -20,6 +20,7 @@ case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]],
}
object TraceContext {
+ implicit val as2 = Kamon.actorSystem.dispatcher
def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil))
}
@@ -28,3 +29,39 @@ object TraceContext {
trait TraceEntry
case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry
+
+
+
+case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry])
+
+
+
+
+
+object Collector {
+
+}
+
+trait TraceEntryStorage {
+ def store(entry: TraceEntry): Boolean
+}
+
+class TransactionContext(val id: UUID, private val storage: TraceEntryStorage) {
+ def store(entry: TraceEntry) = storage.store(entry)
+}
+
+object ThreadLocalTraceEntryStorage extends TraceEntryStorage {
+
+ private val storage = new ThreadLocal[List[TraceEntry]] {
+ override def initialValue(): List[TraceEntry] = Nil
+ }
+
+ def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get)
+
+ def store(entry: TraceEntry): Boolean = {
+ update(entry :: _)
+ true
+ }
+}
+
+
diff --git a/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
index 68ee808b..68ee808b 100644
--- a/src/main/scala/kamon/TraceContextSwap.scala
+++ b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
diff --git a/src/main/scala/kamon/TransactionPublisher.scala b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala
index 0626b91d..0626b91d 100644
--- a/src/main/scala/kamon/TransactionPublisher.scala
+++ b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala
diff --git a/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
index ed76334f..599f2a7a 100644
--- a/src/main/scala/kamon/executor/eventbus.scala
+++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
@@ -7,8 +7,7 @@ import java.util.concurrent.TimeUnit
import kamon.{CodeBlockExecutionTime, Kamon, TraceContext}
import akka.util.Timeout
-import scala.util.Success
-import scala.util.Failure
+import scala.util.{Random, Success, Failure}
import scala.concurrent.Future
trait Message
@@ -35,31 +34,24 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{
case class Ping()
case class Pong()
-class PingActor(val target: ActorRef) extends Actor with ActorLogging {
- implicit def executionContext = context.dispatcher
- implicit val timeout = Timeout(30, TimeUnit.SECONDS)
+class PingActor extends Actor with ActorLogging {
+ val pong = context.actorOf(Props[PongActor])
+ val random = new Random()
def receive = {
case Pong() => {
- log.info(s"pong with context ${Kamon.context}")
- Thread.sleep(1000)
- sender ! Ping()
+ //Thread.sleep(random.nextInt(2000))
+ //log.info("Message from Ping")
+ pong ! Ping()
}
- case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000)
}
-
- def withAny(): Any = {1}
- def withAnyRef(): AnyRef = {new Object}
}
class PongActor extends Actor with ActorLogging {
def receive = {
case Ping() => {
- Thread.sleep(3000)
sender ! Pong()
- log.info(s"ping with context ${Kamon.context}")
}
- case a: Any => println(s"Got ${a} in PONG")
}
}
@@ -74,8 +66,11 @@ object TryAkka extends App{
}
}))
-
-
+ Kamon.start
+ for(i <- 1 to 4) {
+ val ping = system.actorOf(Props[PingActor])
+ ping ! Pong()
+ }
def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body")
@@ -100,8 +95,8 @@ object TryAkka extends App{
Kamon.stop
- Thread.sleep(3000)
- system.shutdown()
+ //Thread.sleep(3000)
+ //system.shutdown()
/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
new file mode 100644
index 00000000..82915ce9
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -0,0 +1,89 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import akka.actor.{Props, ActorSystem, ActorRef}
+import kamon.{Kamon, TraceContext}
+import akka.dispatch.{MessageDispatcher, Envelope}
+import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram}
+import kamon.metric.{MetricDirectory, Metrics}
+import com.codahale.metrics
+import kamon.instrumentation.TraceableMessage
+import scala.Some
+
+case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
+
+
+@Aspect
+class ActorRefTellInstrumentation {
+ import ProceedingJoinPointPimp._
+
+ @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)")
+ def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}
+
+ @Around("sendingMessageToActorRef(actor, message, sender)")
+ def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = {
+
+ val actorName = MetricDirectory.nameForActor(actor)
+ val t = Metrics.registry.timer(actorName + "LATENCY")
+ //println(s"About to proceed with: $actor $message $sender ${Kamon.context}")
+ pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender)
+ }
+}
+
+
+@Aspect("perthis(actorCellCreation(..))")
+class ActorCellInvokeInstrumentation {
+
+ var processingTimeTimer: Timer = _
+ var shouldTrack = false
+
+ // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut.
+
+ @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
+ def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
+
+ @After("actorCellCreation(system, ref, props, dispatcher, parent)")
+ def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
+ val actorName = MetricDirectory.nameForActor(ref)
+ val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
+
+ /** TODO: Find a better way to filter the things we don't want to measure. */
+ //if(system.name != "kamon" && actorName.startsWith("/user")) {
+ processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME")
+ shouldTrack = true
+ //}
+ }
+
+
+ @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)")
+ def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
+
+
+ @Around("invokingActorBehaviourAtActorCell(envelope)")
+ def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
+ import ProceedingJoinPointPimp._
+ println("ENVELOPE --------------------->"+envelope)
+ envelope match {
+ case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
+ timer.stop()
+
+ val originalEnvelope = envelope.copy(message = msg)
+
+ //println("PROCESSING TIME TIMER: "+processingTimeTimer)
+ val pt = processingTimeTimer.time()
+ ctx match {
+ case Some(c) => {
+ Kamon.set(c)
+ println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
+ pjp.proceedWith(originalEnvelope)
+ Kamon.clear
+ }
+ case None => pjp.proceedWith(originalEnvelope)
+ }
+ pt.stop()
+ }
+ case _ => pjp.proceed
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala
new file mode 100644
index 00000000..84c20c52
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala
@@ -0,0 +1,23 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.ProceedingJoinPoint
+
+trait ProceedingJoinPointPimp {
+ import language.implicitConversions
+
+ implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp)
+}
+
+object ProceedingJoinPointPimp extends ProceedingJoinPointPimp
+
+case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) {
+ def proceedWith(newUniqueArg: AnyRef) = {
+ val args = pjp.getArgs
+ args.update(0, newUniqueArg)
+ pjp.proceed(args)
+ }
+
+ def proceedWithTarget(args: AnyRef*) = {
+ pjp.proceed(args.toArray)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
new file mode 100644
index 00000000..b4f8a475
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -0,0 +1,245 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import java.util.concurrent._
+import org.aspectj.lang.ProceedingJoinPoint
+import java.util
+import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector}
+import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory}
+import com.typesafe.config.Config
+import kamon.Kamon
+import scala.concurrent.forkjoin.ForkJoinPool
+import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
+
+
+@Aspect
+class ActorSystemInstrumentation {
+
+ @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)")
+ def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {}
+
+ @After("actorSystemInstantiation(name, applicationConfig, classLoader)")
+ def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = {
+
+ Kamon.Metric.registerActorSystem(name)
+ }
+}
+
+@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))")
+class ForkJoinPoolInstrumentation {
+ var activeThreadsHistogram: Histogram = _
+ var poolSizeHistogram: Histogram = _
+
+ @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)")
+ def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {}
+
+ @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)")
+ def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = {
+ val (actorSystemName, dispatcherName) = threadFactory match {
+ case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames)
+ case _ => ("Unknown", "Unknown")
+ }
+
+ val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName)
+ for(m <- metrics) {
+ activeThreadsHistogram = m.activeThreadCount
+ poolSizeHistogram = m.poolSize
+ println(s"Registered $dispatcherName for actor system $actorSystemName")
+ }
+ }
+
+ def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = {
+ knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName => (asName, threadFactoryName.substring(asName.length+1))).getOrElse(("Unkown", "Unkown"))
+ }
+
+
+
+
+ @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)")
+ def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {}
+
+ @After("forkJoinScan(fjp)")
+ def updateMetrics(fjp: AkkaForkJoinPool): Unit = {
+ activeThreadsHistogram.update(fjp.getActiveThreadCount)
+ poolSizeHistogram.update(fjp.getPoolSize)
+ }
+
+
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+/**
+ * ExecutorService monitoring base:
+ */
+trait ExecutorServiceCollector {
+ def updateActiveThreadCount(diff: Int): Unit
+ def updateTotalThreadCount(diff: Int): Unit
+ def updateQueueSize(diff: Int): Unit
+}
+
+trait WatchedExecutorService {
+ def collector: ExecutorServiceCollector
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+trait ExecutorServiceMonitoring {
+ def dispatcherMetrics: DispatcherMetricCollector
+}
+
+class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring {
+ @volatile var dispatcherMetrics: DispatcherMetricCollector = _
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
+ def createExecutorService: ExecutorService = delegate.createExecutorService
+}
+
+@Aspect
+class ExecutorServiceFactoryProviderInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()")
+ def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = {
+ true
+ }
+
+ @Around("factoryMethodCall(dispatcherName, threadFactory)")
+ def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
+ val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast
+
+ val actorSystemName = threadFactory match {
+ case m: MonitorableThreadFactory => m.name
+ case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name.
+ }
+
+ new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate)
+ }
+
+}
+
+
+@Aspect
+class NamedExecutorServiceFactoryDelegateInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)")
+ def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {}
+
+ @Around("factoryMethodCall(namedFactory)")
+ def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = {
+ val delegate = pjp.proceed().asInstanceOf[ExecutorService]
+ val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)
+
+ ExecutorServiceMetricCollector.register(executorFullName, delegate)
+
+ new NamedExecutorServiceDelegate(executorFullName, delegate)
+ }
+}
+
+case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService {
+ def shutdown() = {
+ ExecutorServiceMetricCollector.deregister(fullName)
+ delegate.shutdown()
+ }
+ def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
+ def isShutdown: Boolean = delegate.isShutdown
+ def isTerminated: Boolean = delegate.isTerminated
+ def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit)
+ def submit[T](task: Callable[T]): Future[T] = delegate.submit(task)
+ def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result)
+ def submit(task: Runnable): Future[_] = delegate.submit(task)
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks)
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit)
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks)
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit)
+ def execute(command: Runnable) = delegate.execute(command)
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
new file mode 100644
index 00000000..c21502ac
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
@@ -0,0 +1,73 @@
+package kamon.instrumentation
+
+import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
+import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue}
+import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect}
+import akka.actor.{ActorSystem, ActorRef}
+import kamon.metric.{Metrics, MetricDirectory}
+import org.aspectj.lang.ProceedingJoinPoint
+
+
+/**
+ * For Mailboxes we would like to track the queue size and message latency. Currently the latency
+ * will be gathered from the ActorCellMetrics.
+ */
+
+
+@Aspect
+class MessageQueueInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)")
+ def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {}
+
+ @Around("messageQueueCreation(owner, system)")
+ def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = {
+ val delegate = pjp.proceed.asInstanceOf[MessageQueue]
+
+ // We are not interested in monitoring mailboxes if we don't know where they belong to.
+ val monitoredMailbox = for(own <- owner; sys <- system) yield {
+ val systemName = sys.name
+ val ownerName = MetricDirectory.nameForActor(own)
+ val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName)
+
+ val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir())
+ Metrics.include(mailBoxName, queueSizeHistogram)
+
+ new MonitoredMessageQueue(delegate, queueSizeHistogram)
+ }
+
+ monitoredMailbox match {
+ case None => delegate
+ case Some(mmb) => mmb
+ }
+ }
+}
+
+
+class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{
+
+ def enqueue(receiver: ActorRef, handle: Envelope) = {
+ delegate.enqueue(receiver, handle)
+ queueSizeHistogram.update(numberOfMessages)
+ }
+
+ def dequeue(): Envelope = {
+ val envelope = delegate.dequeue()
+ queueSizeHistogram.update(numberOfMessages)
+
+ envelope
+ }
+
+ def numberOfMessages: Int = delegate.numberOfMessages
+ def hasMessages: Boolean = delegate.hasMessages
+ def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters)
+}
+
+
+
+
+
+
+
+
+
diff --git a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
index ef908625..e75a638f 100644
--- a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
@@ -45,6 +45,12 @@ class RunnableInstrumentation {
*/
import kamon.TraceContextSwap.withContext
+ @Before("instrumentedRunnableCreation()")
+ def beforeCreation = {
+ //println((new Throwable).getStackTraceString)
+ }
+
+
@Around("runnableExecution()")
def around(pjp: ProceedingJoinPoint) = {
import pjp._
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala
new file mode 100644
index 00000000..74261403
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala
@@ -0,0 +1,49 @@
+package kamon.instrumentation
+
+import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
+import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect}
+
+class ActorCage(val name: String, val size: Int) {
+
+ def doIt: Unit = println("name")
+}
+
+trait CageMonitoring {
+ def histogram: Histogram
+ def count(value: Int): Unit
+}
+
+class CageMonitoringImp extends CageMonitoring{
+ final val histogram = new Histogram(new ExponentiallyDecayingReservoir())
+
+ def count(value: Int) = histogram.update(value)
+
+}
+
+
+@Aspect
+class InceptionAspect {
+
+ @DeclareMixin("kamon.instrumentation.ActorCage")
+ def mixin: CageMonitoring = new CageMonitoringImp
+
+
+ @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)")
+ def theActorCageDidIt(actorCage: CageMonitoring) = {}
+
+ @After("theActorCageDidIt(actorCage)")
+ def afterDoingIt(actorCage: CageMonitoring) = {
+ actorCage.count(1)
+ actorCage.histogram.getSnapshot.dump(System.out)
+ }
+
+
+
+}
+
+
+object Runner extends App {
+ val cage = new ActorCage("ivan", 10)
+
+ cage.doIt
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
new file mode 100644
index 00000000..54a13f39
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
@@ -0,0 +1,67 @@
+package kamon.metric
+
+import java.util.concurrent.{ThreadPoolExecutor, ExecutorService}
+import scala.concurrent.forkjoin.ForkJoinPool
+import com.codahale.metrics.{Metric, MetricFilter}
+
+object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector {
+
+ def register(fullName: String, executorService: ExecutorService) = executorService match {
+ case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp)
+ case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe)
+ case _ => // If it is a unknown Executor then just do nothing.
+ }
+
+ def deregister(fullName: String) = {
+ Metrics.registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
+ })
+ }
+}
+
+
+trait ForkJoinPoolMetricCollector {
+ import GaugeGenerator._
+ import BasicExecutorMetricNames._
+
+
+ def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = {
+ val forkJoinPoolGauge = newNumericGaugeFor(fjp) _
+
+ val allMetrics = Map(
+ fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt),
+ fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize),
+ fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount)
+ )
+
+ allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) }
+ }
+}
+
+trait ThreadPoolExecutorMetricCollector {
+ import GaugeGenerator._
+ import BasicExecutorMetricNames._
+
+ def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = {
+ val tpeGauge = newNumericGaugeFor(tpe) _
+
+ val allMetrics = Map(
+ fullName + queueSize -> tpeGauge(_.getQueue.size()),
+ fullName + poolSize -> tpeGauge(_.getPoolSize),
+ fullName + activeThreads -> tpeGauge(_.getActiveCount)
+ )
+
+ allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) }
+ }
+}
+
+
+object BasicExecutorMetricNames {
+ val queueSize = "queueSize"
+ val poolSize = "threads/poolSize"
+ val activeThreads = "threads/activeThreads"
+}
+
+
+
+
diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala
new file mode 100644
index 00000000..30635432
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala
@@ -0,0 +1,12 @@
+package kamon.metric
+
+import com.codahale.metrics.Gauge
+
+trait GaugeGenerator {
+
+ def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T => V) = new Gauge[V] {
+ def getValue: V = generator(target)
+ }
+}
+
+object GaugeGenerator extends GaugeGenerator
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala
new file mode 100644
index 00000000..fb117968
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala
@@ -0,0 +1,6 @@
+package kamon.metric
+
+object MetricFilter {
+ def actorSystem(system: String): Boolean = !system.startsWith("kamon")
+ def actor(path: String, system: String): Boolean = true
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
new file mode 100644
index 00000000..cdc0a334
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
@@ -0,0 +1,146 @@
+package kamon.metric
+
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit}
+import akka.actor.ActorRef
+import com.codahale.metrics
+import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry}
+
+
+object Metrics {
+ val registry: MetricRegistry = new MetricRegistry
+
+ val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS)
+ //consoleReporter.build().start(45, TimeUnit.SECONDS)
+
+ //val newrelicReporter = NewRelicReporter(registry)
+ //newrelicReporter.start(5, TimeUnit.SECONDS)
+
+ def include(name: String, metric: Metric) = {
+ //registry.register(name, metric)
+ }
+
+ def exclude(name: String) = {
+ registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(name)
+ })
+ }
+
+
+
+ def deregister(fullName: String) = {
+ registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
+ })
+ }
+}
+
+object Watched {
+ case object Actor
+ case object Dispatcher
+}
+
+object MetricDirectory {
+ def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/"
+
+ def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox"
+
+ def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/")
+
+ def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon")
+
+
+ def shouldInstrumentActor(actorPath: String): Boolean = {
+ !(actorPath.isEmpty || actorPath.startsWith("system"))
+ }
+
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram)
+
+
+
+
+trait Histogram {
+ def update(value: Long): Unit
+ def snapshot: HistogramSnapshot
+}
+
+trait HistogramSnapshot {
+ def median: Double
+ def max: Double
+ def min: Double
+}
+
+
+case class ActorSystemMetrics(actorSystemName: String) {
+ import scala.collection.JavaConverters._
+ val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala
+
+ private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram())
+
+ def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = {
+ val stats = createDispatcherCollector
+ dispatchers.put(dispatcherName, stats)
+ Some(stats)
+ }
+
+}
+
+
+case class CodahaleHistogram() extends Histogram {
+ private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir())
+
+ def update(value: Long) = histogram.update(value)
+ def snapshot: HistogramSnapshot = {
+ val snapshot = histogram.getSnapshot
+
+ CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin)
+ }
+}
+
+case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot
+
+
+
+
+
+
+
+/**
+ * Dispatcher Metrics that we care about currently with a histogram-like nature:
+ * - Work Queue Size
+ * - Total/Active Thread Count
+ */
+
+
+
+import annotation.tailrec
+import java.util.concurrent.atomic.AtomicReference
+
+object Atomic {
+ def apply[T]( obj : T) = new Atomic(new AtomicReference(obj))
+ implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref)
+}
+
+class Atomic[T](val atomic : AtomicReference[T]) {
+ @tailrec
+ final def update(f: T => T) : T = {
+ val oldValue = atomic.get()
+ val newValue = f(oldValue)
+ if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f)
+ }
+
+ def get() = atomic.get()
+} \ No newline at end of file
diff --git a/src/main/scala/kamon/metric/MetricsUtils.scala b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala
index 5b4ceaf4..5b4ceaf4 100644
--- a/src/main/scala/kamon/metric/MetricsUtils.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala
diff --git a/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala
index 56dce913..70f3e54a 100644
--- a/src/main/scala/kamon/metric/NewRelicReporter.scala
+++ b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala
@@ -1,6 +1,7 @@
package kamon.metric
-import com.codahale.metrics._
+import com.codahale.metrics
+import metrics._
import java.util.concurrent.TimeUnit
import java.util
import com.newrelic.api.agent.NewRelic
@@ -9,6 +10,8 @@ import scala.collection.JavaConverters._
class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) {
+
+
private[NewRelicReporter] def processMeter(name: String, meter: Meter) {
NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat)
}
@@ -17,15 +20,32 @@ class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilt
println(s"Logging to NewRelic: ${counter.getCount}")
}
- def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) {
+
+
+/* def processGauge(name: String, gauge: Gauge[_]) = {
+ println(s"the value is: "+gauge.getValue)
+ NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float])
+ }*/
+
+
+ def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) {
//Process Meters
meters.asScala.map{case(name, meter) => processMeter(name, meter)}
//Process Meters
counters.asScala.map{case(name, counter) => processCounter(name, counter)}
+
+ // Gauges
+ gauges.asScala.foreach{ case (name, gauge) => {
+ val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue()
+ val fullMetricName = "Custom" + name
+ NewRelic.recordMetric(fullMetricName, measure)
+ }}
}
+
+
}
object NewRelicReporter {
- def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS)
+ def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS)
} \ No newline at end of file
diff --git a/src/main/scala/spraytest/ClientTest.scala b/kamon-core/src/main/scala/spraytest/ClientTest.scala
index 7a95fc76..07532d0a 100644
--- a/src/main/scala/spraytest/ClientTest.scala
+++ b/kamon-core/src/main/scala/spraytest/ClientTest.scala
@@ -5,6 +5,8 @@ import spray.client.pipelining._
import spray.httpx.SprayJsonSupport
import spray.json._
import scala.concurrent.Future
+import spray.can.Http
+import akka.io.IO
/**
* BEGIN JSON Infrastructure
@@ -34,7 +36,7 @@ class ClientTest extends App {
import SprayJsonSupport._
-
+ val actor = IO(Http)
val pipeline = sendReceive ~> unmarshal[Container]
diff --git a/src/main/scala/spraytest/FutureTesting.scala b/kamon-core/src/main/scala/spraytest/FutureTesting.scala
index f592f6d7..b864d6d6 100644
--- a/src/main/scala/spraytest/FutureTesting.scala
+++ b/kamon-core/src/main/scala/spraytest/FutureTesting.scala
@@ -63,7 +63,7 @@ object TraceableFuture {
implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future
def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = {
- val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.id, Nil))
+ val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.dispatcherName, Nil))
new TraceableFuture(Future { wrappedBody })
}
diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala
new file mode 100644
index 00000000..f9d6869c
--- /dev/null
+++ b/kamon-core/src/main/scala/test/PingPong.scala
@@ -0,0 +1,34 @@
+package test
+
+import akka.actor.{Props, Actor, ActorSystem}
+
+object PingPong extends App {
+
+ val as = ActorSystem("ping-pong")
+
+ val pinger = as.actorOf(Props[Pinger])
+ val ponger = as.actorOf(Props[Ponger])
+
+ pinger.tell(Pong, ponger)
+
+
+ Thread.sleep(30000)
+ as.shutdown()
+
+
+}
+
+case object Ping
+case object Pong
+
+class Pinger extends Actor {
+ def receive = {
+ case Pong => sender ! Ping
+ }
+}
+
+class Ponger extends Actor {
+ def receive = {
+ case Ping => sender ! Pong
+ }
+}
diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
new file mode 100644
index 00000000..0026d953
--- /dev/null
+++ b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
@@ -0,0 +1,45 @@
+package akka.instrumentation
+
+import org.scalatest.{WordSpecLike, Matchers}
+import akka.actor.{Actor, Props, ActorSystem}
+
+import akka.testkit.{ImplicitSender, TestKit}
+import kamon.{TraceContext, Kamon}
+
+
+class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender {
+
+ "an instrumented actor ref" when {
+ "used inside the context of a transaction" should {
+ "propagate the trace context using bang" in new TraceContextEchoFixture {
+ echo ! "test"
+
+ expectMsg(Some(testTraceContext))
+ }
+
+ "propagate the trace context using tell" in {
+
+ }
+
+ "propagate the trace context using ask" in {
+
+ }
+ }
+ }
+
+ trait TraceContextEchoFixture {
+ val testTraceContext = Kamon.newTraceContext()
+ val echo = system.actorOf(Props[TraceContextEcho])
+
+ Kamon.set(testTraceContext)
+ }
+
+}
+
+class TraceContextEcho extends Actor {
+ def receive = {
+ case msg ⇒ sender ! Kamon.context()
+ }
+}
+
+
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala
new file mode 100644
index 00000000..1eab6355
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala
@@ -0,0 +1,22 @@
+package kamon.instrumentation
+
+import org.scalatest.{Matchers, WordSpec}
+import akka.actor.ActorSystem
+import kamon.Kamon
+
+class ActorSystemInstrumentationSpec extends WordSpec with Matchers {
+
+ // TODO: Selection filters to exclude unwanted actor systems. Read from configuration.
+
+ "the actor system instrumentation" should {
+ "register all actor systems created" in {
+ val as1 = ActorSystem("as1")
+ val as2 = ActorSystem("as2")
+
+
+ Kamon.Metric.actorSystem("as1") should not be (None)
+ Kamon.Metric.actorSystem("as2") should not be (None)
+ Kamon.Metric.actorSystem("unknown") should be (None)
+ }
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
new file mode 100644
index 00000000..89ef61f3
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
@@ -0,0 +1,34 @@
+package kamon.instrumentation
+
+import org.scalatest.{Matchers, WordSpec}
+import akka.actor.{Actor, Props, ActorSystem}
+import kamon.metric.MetricDirectory
+import kamon.Kamon
+
+class DispatcherInstrumentationSpec extends WordSpec with Matchers{
+
+
+ "the dispatcher instrumentation" should {
+ "instrument a dispatcher that belongs to a non-filtered actor system" in new SingleDispatcherActorSystem {
+ val x = Kamon.Metric.actorSystem("single-dispatcher").get.dispatchers
+ (1 to 10).foreach(actor ! _)
+
+ val active = x.get("akka.actor.default-dispatcher").get.activeThreadCount.snapshot
+ println("Active max: "+active.max)
+ println("Active min: "+active.min)
+
+ }
+ }
+
+
+ trait SingleDispatcherActorSystem {
+ val actorSystem = ActorSystem("single-dispatcher")
+ val actor = actorSystem.actorOf(Props(new Actor {
+ def receive = {
+ case a => sender ! a;
+ }
+ }))
+
+ }
+}
+
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala
new file mode 100644
index 00000000..cc55ec92
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala
@@ -0,0 +1,53 @@
+package kamon.instrumentation
+
+import org.scalatest.WordSpec
+import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
+import java.util.concurrent.ConcurrentLinkedQueue
+import akka.dispatch.{UnboundedMessageQueueSemantics, QueueBasedMessageQueue, Envelope}
+import java.util.Queue
+import akka.actor.{ActorSystem, Actor}
+
+class MessageQueueInstrumentationSpec(val actorSystem: ActorSystem) extends WordSpec {
+ def this() = this(ActorSystem("MessageQueueInstrumentationSpec"))
+
+
+ /*"A MonitoredMessageQueue" should {
+ "update the related histogram when a message is enqueued" in {
+ new PopulatedMessageQueueFixture {
+
+ assert(histogram.getSnapshot.getMax === 0)
+
+ for(i <- 1 to 3) { enqueueDummyMessage }
+
+ assert(histogram.getCount === 3)
+ assert(histogram.getSnapshot.getMax === 3)
+ assert(histogram.getSnapshot.getMin === 1)
+ }
+ }
+
+ "update the related histogram when a message is dequeued" in {
+ new PopulatedMessageQueueFixture {
+ for(i <- 1 to 3) { enqueueDummyMessage }
+ assert(histogram.getSnapshot.getMax === 3)
+
+ messageQueue.dequeue()
+ messageQueue.dequeue()
+
+ assert(histogram.getCount === 5)
+ assert(histogram.getSnapshot.getMax === 3)
+ assert(histogram.getSnapshot.getMin === 1)
+ }
+ }
+ }
+
+ trait PopulatedMessageQueueFixture {
+
+ val histogram = new Histogram(new ExponentiallyDecayingReservoir())
+/* val delegate = new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
+ final def queue: Queue[Envelope] = this
+ }*/
+ val messageQueue = new MonitoredMessageQueue(delegate, histogram)
+
+ def enqueueDummyMessage = messageQueue.enqueue(Actor.noSender, Envelope("", Actor.noSender, actorSystem))
+ }*/
+}
diff --git a/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
index 4fe9e617..de65aaca 100644
--- a/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
+++ b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
@@ -1,9 +1,8 @@
package kamon.instrumentation
import scala.concurrent.{Await, Promise, Future}
-import org.scalatest.{OptionValues, WordSpec}
-import org.scalatest.matchers.MustMatchers
-import org.scalatest.concurrent.PatienceConfiguration
+import org.scalatest.{Matchers, OptionValues, WordSpec}
+import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration}
import kamon.{Kamon, TraceContext}
import java.util.UUID
import scala.util.Success
@@ -12,7 +11,7 @@ import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
-class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaFutures with PatienceConfiguration with OptionValues {
+class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues {
"a instrumented runnable" when {
"created in a thread that does have a TraceContext" must {
@@ -20,7 +19,7 @@ class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaF
"should be available during the run method execution" in { new FutureWithContextFixture {
whenReady(futureWithContext) { result =>
- result.value must be === testContext
+ result.value should equal(testContext)
}
}}
@@ -32,7 +31,7 @@ class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaF
})
whenReady(onCompleteContext.future) { result =>
- result must be === testContext
+ result should equal(testContext)
}
}}
}
@@ -42,7 +41,7 @@ class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaF
"not capture any TraceContext for the body execution" in { new FutureWithoutContextFixture{
whenReady(futureWithoutContext) { result =>
- result must be === None
+ result should equal(None)
}
}}
@@ -54,7 +53,7 @@ class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaF
})
whenReady(onCompleteContext.future) { result =>
- result must be === None
+ result should equal(None)
}
}}
}
diff --git a/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala b/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala
new file mode 100644
index 00000000..e117db1b
--- /dev/null
+++ b/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala
@@ -0,0 +1,14 @@
+package kamon.logging
+
+import akka.actor.Actor
+import kamon.Kamon
+
+trait UowActorLogging {
+ self: Actor =>
+
+ def logWithUOW(text: String) = {
+ val uow = Kamon.context.map(_.userContext).getOrElse("NA")
+ println(s"=======>[$uow] - $text")
+ }
+
+}
diff --git a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala
new file mode 100644
index 00000000..e79602ea
--- /dev/null
+++ b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala
@@ -0,0 +1,28 @@
+package kamon.logging
+
+import java.util.concurrent.atomic.AtomicLong
+import spray.routing.Directive0
+import spray.routing.directives.BasicDirectives
+import java.net.InetAddress
+import scala.util.Try
+import kamon.Kamon
+
+trait UowDirectives extends BasicDirectives {
+ def uow: Directive0 = mapRequest { request =>
+ val generatedUow = Some(UowDirectives.newUow)
+ println("Generated UOW: "+generatedUow)
+ Kamon.set(Kamon.newTraceContext().copy(userContext = generatedUow))
+
+
+ request
+ }
+}
+
+object UowDirectives {
+ val uowCounter = new AtomicLong
+
+ val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName.toString).getOrElse("unknown-localhost")
+
+ def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet())
+
+}
diff --git a/project/Build.scala b/project/Build.scala
index 37765ccf..c2822185 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -7,15 +7,24 @@ object Build extends Build {
import Settings._
import Dependencies._
- lazy val root = Project("kamon", file("."))
+ lazy val root = Project("root", file("."))
+ .aggregate(kamonCore, kamonUow)
+ .settings(basicSettings: _*)
+
+ lazy val kamonCore = Project("kamon-core", file("kamon-core"))
.settings(basicSettings: _*)
.settings(revolverSettings: _*)
.settings(aspectJSettings: _*)
.settings(newrelicSettings: _*)
+
.settings(
libraryDependencies ++=
- compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, sprayJson) ++
- test(scalatest, sprayTestkit))
-
+ compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, sprayJson) ++
+ test(scalatest, akkaTestKit, sprayTestkit))
+ lazy val kamonUow = Project("kamon-uow", file("kamon-uow"))
+ .settings(basicSettings: _*)
+ .settings(libraryDependencies ++=
+ compile(akkaActor, akkaSlf4j, sprayRouting))
+ .dependsOn(kamonCore)
} \ No newline at end of file
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index a0d51a39..c3162065 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -7,22 +7,22 @@ object Dependencies {
"spray nightlies repo" at "http://nightlies.spray.io"
)
- val sprayCan = "io.spray" % "spray-can" % "1.1-20130509"
- val sprayRouting = "io.spray" % "spray-routing" % "1.1-20130509"
- val sprayTestkit = "io.spray" % "spray-testkit" % "1.1-20130509"
- val sprayClient = "io.spray" % "spray-client" % "1.1-20130509"
- val sprayServlet = "io.spray" % "spray-servlet" % "1.1-20130509"
+ val sprayCan = "io.spray" % "spray-can" % "1.1-M8"
+ val sprayRouting = "io.spray" % "spray-routing" % "1.1-M8"
+ val sprayTestkit = "io.spray" % "spray-testkit" % "1.1-M8"
+ val sprayClient = "io.spray" % "spray-client" % "1.1-M8"
+ val sprayServlet = "io.spray" % "spray-servlet" % "1.1-M8"
val sprayJson = "io.spray" %% "spray-json" % "1.2.3"
val scalaReflect = "org.scala-lang" % "scala-reflect" % "2.10.1"
- val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.1.2"
- val akkaAgent = "com.typesafe.akka" %% "akka-agent" % "2.1.2"
- val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.1.2"
- val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % "2.1.2"
- val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.M5b"
+ val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.2.0"
+ val akkaAgent = "com.typesafe.akka" %% "akka-agent" % "2.2.0"
+ val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.2.0"
+ val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % "2.2.0"
+ val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.M6-SNAP22"
val logback = "ch.qos.logback" % "logback-classic" % "1.0.10"
val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2"
- val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.0-BETA2"
- val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.17.2"
+ val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.0"
+ val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.19.0"
def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile")
def provided (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "provided")
diff --git a/project/NewRelic.scala b/project/NewRelic.scala
index 766eb28d..74f8fc30 100644
--- a/project/NewRelic.scala
+++ b/project/NewRelic.scala
@@ -8,6 +8,6 @@ object NewRelic {
lazy val newrelicSettings = SbtNewrelic.newrelicSettings ++ Seq(
javaOptions in run <++= jvmOptions in newrelic,
- newrelicVersion in newrelic := "2.18.0"
+ newrelicVersion in newrelic := "2.19.0"
)
}
diff --git a/project/Settings.scala b/project/Settings.scala
index 640a8013..5fadc25d 100644
--- a/project/Settings.scala
+++ b/project/Settings.scala
@@ -7,8 +7,8 @@ object Settings {
lazy val basicSettings = seq(
version := VERSION,
- organization := "com.despegar",
- scalaVersion := "2.10.0",
+ organization := "kamon",
+ scalaVersion := "2.10.2",
resolvers ++= Dependencies.resolutionRepos,
fork in run := true,
scalacOptions := Seq(
diff --git a/project/plugins.sbt b/project/plugins.sbt
index f8ce9e3c..b8910961 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -8,3 +8,4 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-aspectj" % "0.9.0")
addSbtPlugin("com.ivantopo.sbt" % "sbt-newrelic" % "0.0.1")
+
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
deleted file mode 100644
index 1413f424..00000000
--- a/src/main/resources/META-INF/aop.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
-
-<aspectj>
- <!--<weaver options="-verbose -showWeaveInfo">
- <dump within="*"/>
- </weaver>-->
-
- <aspects>
-
- <!--<aspect name="akka.ActorSystemAspect"/>
- &lt;!&ndash;<aspect name="akka.MailboxAspect"/>&ndash;&gt;-->
- <!--<aspect name="akka.PoolMonitorInstrumentation"/>-->
- <aspect name="akka.ActorInstrumentation" />
- <aspect name="akka.instrumentation.ActorRefTellInstrumentation"/>
- <aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/>
- <aspect name="kamon.instrumentation.RunnableInstrumentation" />
-
- <include within="*"/>
- <exclude within="javax..*"/>
- <exclude within="org.aspectj..*"/>
- <exclude within="scala..*"/>
- <exclude within="scalaz..*"/>
- <exclude within="scalad..*"/>
- <exclude within="play..*"/>
- </aspects>
-
-</aspectj>
diff --git a/src/main/scala/akka/ActorInstrumentation.scala b/src/main/scala/akka/ActorInstrumentation.scala
deleted file mode 100644
index afe0e459..00000000
--- a/src/main/scala/akka/ActorInstrumentation.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package akka
-
-import actor.ActorCell
-import org.aspectj.lang.annotation.{After, Around, Pointcut, Aspect}
-import org.aspectj.lang.ProceedingJoinPoint
-import kamon.metric.Metrics.{ metricsRegistry => meterRegistry }
-import com.codahale.metrics.Meter
-import kamon.metric.MetricsUtils._
-
-@Aspect("perthis(actorCellCreation(*))")
-class ActorInstrumentation {
-
- /**
- * Aspect members
- */
-
- private val actorMeter:Meter = new Meter
-
- /**
- * Pointcuts
- */
- @Pointcut("execution(akka.actor.ActorCell+.new(..)) && this(actor)")
- def actorCellCreation(actor:ActorCell):Unit = {}
-
- @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))")
- def actorReceive():Unit = {}
-
- /**
- * Advices
- */
- @After("actorCellCreation(actor)")
- def afterCellCreation(actor:ActorCell):Unit ={
- val actorName:String = actor.self.path.toString
-
- meterRegistry.register(s"meter-for-${actorName}", actorMeter)
- }
-
- @Around("actorReceive()")
- def around(pjp: ProceedingJoinPoint) = {
- import pjp._
-
- markMeter(actorMeter) {
- proceed
- }
- }
- } \ No newline at end of file
diff --git a/src/main/scala/akka/ActorSystemAspect.scala b/src/main/scala/akka/ActorSystemAspect.scala
deleted file mode 100644
index 9d1d515d..00000000
--- a/src/main/scala/akka/ActorSystemAspect.scala
+++ /dev/null
@@ -1,18 +0,0 @@
-package akka
-
-import org.aspectj.lang.annotation._
-import actor.ActorSystemImpl
-
-@Aspect
-class ActorSystemAspect {
- println("Created ActorSystemAspect")
-
- @Pointcut("execution(* akka.actor.ActorRefProvider+.init(..)) && !within(ActorSystemAspect)")
- protected def actorSystem():Unit = {}
-
- @After("actorSystem() && args(system)")
- def collectActorSystem(system: ActorSystemImpl):Unit = {
- Tracer.collectActorSystem(system)
- Tracer.start()
- }
-}
diff --git a/src/main/scala/akka/MailboxAspect.scala b/src/main/scala/akka/MailboxAspect.scala
deleted file mode 100644
index 5ca6d6ab..00000000
--- a/src/main/scala/akka/MailboxAspect.scala
+++ /dev/null
@@ -1,16 +0,0 @@
-package akka
-
-import org.aspectj.lang.annotation._
-
-@Aspect("perthis(mailboxMonitor())")
-class MailboxAspect {
- println("Created MailboxAspect")
-
- @Pointcut("execution(akka.dispatch.Mailbox.new(..)) && !within(MailboxAspect)")
- protected def mailboxMonitor():Unit = {}
-
- @After("mailboxMonitor() && this(mb)")
- def afterInitialization(mb: akka.dispatch.Mailbox) : Unit = {
- Tracer.collectMailbox(mb)
- }
-} \ No newline at end of file
diff --git a/src/main/scala/akka/MailboxMetrics.scala b/src/main/scala/akka/MailboxMetrics.scala
deleted file mode 100644
index 6bf65cc7..00000000
--- a/src/main/scala/akka/MailboxMetrics.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-package akka
-
-import akka.dispatch.Mailbox
-import com.newrelic.api.agent.NewRelic
-
-case class MailboxMetrics(mailboxes:Map[String,Mailbox])
-
-
-object MailboxMetrics {
- def apply(mailboxes: List[Mailbox]) = {
- new MailboxMetrics(mailboxes.take(mailboxes.length - 1).map{m => (m.actor.self.path.toString -> m)}.toMap) //TODO:research why collect an ActorSystemImpl
- }
-
- def toMap(mb: Mailbox):Map[String,Int] = Map[String,Int](
- "NumberOfMessages" -> mb.numberOfMessages,
- "MailboxDispatcherThroughput" -> mb.dispatcher.throughput,
- "SuspendCount" -> mb.suspendCount
- )
-}
-
-class MailboxSenderMetrics(mailboxes:List[Mailbox]) extends Runnable {
- def run() {
- val mbm = MailboxMetrics(mailboxes)
- mbm.mailboxes.map { case(actorName,mb) => {
- println(s"Sending metrics to Newrelic MailBoxMonitor for Actor -> ${actorName}")
-
- MailboxMetrics.toMap(mb).map {case(property, value) =>
- NewRelic.recordMetric(s"${actorName}:Mailbox:${property}", value)
- }
- }
- }
- }
-}
-
-
diff --git a/src/main/scala/akka/PoolMetrics.scala b/src/main/scala/akka/PoolMetrics.scala
deleted file mode 100644
index 422e34fd..00000000
--- a/src/main/scala/akka/PoolMetrics.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-package akka
-
-import scala.concurrent.forkjoin.ForkJoinPool
-import com.newrelic.api.agent.NewRelic
-
-case class PoolMetrics(poolName:String, data:Map[String,Int])
-
-object PoolMetrics {
- def apply(pool: ForkJoinPool) = new PoolMetrics(pool.getClass.getSimpleName, toMap(pool))
-
- def toMap(pool: scala.concurrent.forkjoin.ForkJoinPool):Map[String,Int] = Map[String,Int](
- "ActiveThreadCount" -> pool.getActiveThreadCount,
- "Parallelism" -> pool.getParallelism,
- "PoolSize" -> pool.getPoolSize,
- "QueuedSubmissionCount" -> pool.getQueuedSubmissionCount,
- "StealCount" -> pool.getStealCount.toInt,
- "QueuedTaskCount" -> pool.getQueuedTaskCount.toInt,
- "RunningThreadCount" -> pool.getRunningThreadCount
- )
-}
-
-class PoolMetricsSender(forkJoinPool:ForkJoinPool) extends Runnable {
- def run() {
- val pool = PoolMetrics(forkJoinPool)
- println(s"Sending Metrics to NewRelic -> ${pool}")
- pool.data.map{case(k,v) => NewRelic.recordMetric(s"${pool.poolName}:${k}",v)}
- }
-}
-
diff --git a/src/main/scala/akka/PoolMonitorInstrumentation.scala b/src/main/scala/akka/PoolMonitorInstrumentation.scala
deleted file mode 100644
index 167083e8..00000000
--- a/src/main/scala/akka/PoolMonitorInstrumentation.scala
+++ /dev/null
@@ -1,16 +0,0 @@
-package akka
-
-import org.aspectj.lang.annotation._
-
-@Aspect("perthis(poolMonitor(*))")
-class PoolMonitorAspect {
- println("Created PoolMonitorAspect")
-
- @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..)) && this(pool)")
- protected def poolMonitor(pool:scala.concurrent.forkjoin.ForkJoinPool):Unit = {}
-
- @After("poolMonitor(pool)")
- def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = {
-
- }
-}
diff --git a/src/main/scala/akka/Tracer.scala b/src/main/scala/akka/Tracer.scala
deleted file mode 100644
index 3b301247..00000000
--- a/src/main/scala/akka/Tracer.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-package akka
-
-import actor.ActorSystemImpl
-import scala.concurrent.forkjoin.ForkJoinPool
-import scala.concurrent.duration._
-import akka.dispatch.Mailbox
-import scala._
-
-object Tracer {
- protected[this] var mailboxes:List[Mailbox] = Nil
- protected[this] var tracerActorSystem: ActorSystemImpl = _
- protected[this] var forkJoinPool:ForkJoinPool = _
-
- def collectPool(pool: ForkJoinPool) = forkJoinPool = pool
- def collectActorSystem(actorSystem: ActorSystemImpl) = tracerActorSystem = actorSystem
- def collectMailbox(mb: akka.dispatch.Mailbox) = mailboxes ::= mb
-
- def start():Unit ={
- implicit val dispatcher = tracerActorSystem.dispatcher
-
- tracerActorSystem.scheduler.schedule(6 seconds, 5 second, new MailboxSenderMetrics(mailboxes))
- tracerActorSystem.scheduler.schedule(7 seconds, 5 second, new PoolMetricsSender(forkJoinPool))
- }
-} \ No newline at end of file
diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
deleted file mode 100644
index f631b79a..00000000
--- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-package akka.instrumentation
-
-import org.aspectj.lang.annotation.{Around, Pointcut, Aspect}
-import org.aspectj.lang.ProceedingJoinPoint
-import akka.actor.{ActorRef}
-import kamon.{Kamon, TraceContext}
-import akka.dispatch.Envelope
-
-case class TraceableMessage(traceContext: TraceContext, message: Any)
-
-
-@Aspect
-class ActorRefTellInstrumentation {
- println("Created ActorAspect")
-
- @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.pattern.PromiseActorRef) && args(message, sender)")
- def sendingMessageToActorRef(message: Any, sender: ActorRef) = {}
-
- @Around("sendingMessageToActorRef(message, sender)")
- def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = {
- import pjp._
-
- Kamon.context() match {
- case Some(ctx) => {
- val traceableMessage = TraceableMessage(ctx, message)
- proceed(getArgs.updated(0, traceableMessage))
- }
- case None => proceed
- }
- }
-}
-
-
-@Aspect
-class ActorCellInvokeInstrumentation {
-
- @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)")
- def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
-
-
- @Around("invokingActorBehaviourAtActorCell(envelope)")
- def around(pjp: ProceedingJoinPoint, envelope: Envelope) = {
- import pjp._
-
- envelope match {
- case Envelope(TraceableMessage(ctx, msg), sender) => {
- Kamon.set(ctx)
-
- val originalEnvelope = envelope.copy(message = msg)
- proceed(getArgs.updated(0, originalEnvelope))
-
- Kamon.clear
- }
- case _ => proceed
- }
- }
-} \ No newline at end of file
diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala
deleted file mode 100644
index ef5f8044..00000000
--- a/src/main/scala/kamon/Kamon.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-package kamon
-
-import akka.actor.{Props, ActorSystem}
-
-object Kamon {
-
- val ctx = new ThreadLocal[Option[TraceContext]] {
- override def initialValue() = None
- }
-
- implicit lazy val actorSystem = ActorSystem("kamon")
-
-
- def context() = ctx.get()
- def clear = ctx.remove()
- def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
-
- def start = set(newTraceContext)
- def stop = ctx.get match {
- case Some(context) => context.close
- case None =>
- }
-
- def newTraceContext(): TraceContext = TraceContext()
-
-
- val publisher = actorSystem.actorOf(Props[TransactionPublisher])
-
- def publish(tx: FullTransaction) = publisher ! tx
-
-}
diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala
deleted file mode 100644
index cf04659b..00000000
--- a/src/main/scala/kamon/metric/Metrics.scala
+++ /dev/null
@@ -1,14 +0,0 @@
-package kamon.metric
-
-import java.util.concurrent.TimeUnit
-import com.codahale.metrics._
-
-object Metrics {
- val metricsRegistry: MetricRegistry = new MetricRegistry
-
- val consoleReporter = ConsoleReporter.forRegistry(metricsRegistry)
- val newrelicReporter = NewRelicReporter(metricsRegistry)
-
- newrelicReporter.start(5, TimeUnit.SECONDS)
- consoleReporter.build().start(5, TimeUnit.SECONDS)
-} \ No newline at end of file
diff --git a/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
deleted file mode 100644
index 4cc15a2f..00000000
--- a/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-package akka.instrumentation
-
-import org.scalatest.WordSpec
-import org.scalatest.matchers.{ShouldMatchers, MustMatchers}
-import akka.actor.{Actor, Props, ActorSystem}
-import kamon.metric.Metrics._
-import scala.collection.JavaConverters._
-
-
-class ActorInstrumentationSpec extends WordSpec with MustMatchers with ShouldMatchers {
- val system = ActorSystem()
- import system._
-
- val echoRef = actorOf(Props(new EchoActor), "Echo-Actor")
- val meterForEchoActor = "meter-for-akka://default/user/Echo-Actor"
- val totalMessages = 1000
-
- "an instrumented Actor" should {
- "send a message and record a metric on the Metrics Registry with the number of sent messages" in {
-
- (1 to totalMessages).foreach {x:Int =>
- echoRef ! s"Message ${x}"
- }
-
- //to ensure that all messages was received
- Thread.sleep(1000)
-
- val messages = metricsRegistry.getMeters.asScala.get(meterForEchoActor).get.getCount
-
- messages should equal(totalMessages)
- }
- }
-
-}
-
-class EchoActor extends Actor {
- def receive = {
- case msg ⇒ sender ! msg
- }
-}
-
-
diff --git a/src/test/scala/kamon/instrumentation/ScalaFutures.scala b/src/test/scala/kamon/instrumentation/ScalaFutures.scala
deleted file mode 100644
index 169b709c..00000000
--- a/src/test/scala/kamon/instrumentation/ScalaFutures.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package kamon.instrumentation
-
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success}
-import org.scalatest.concurrent.Futures
-import java.util.concurrent.TimeUnit
-
-trait ScalaFutures extends Futures {
- implicit def scalaFutureToFutureConcept[T](future: Future[T]): FutureConcept[T] = new FutureConcept[T] {
- def eitherValue: Option[Either[Throwable, T]] = {
- if(!future.isCompleted)
- None
- else
- future.value match {
- case None => None
- case Some(t) => t match {
- case Success(v) => Some(Right(v))
- case Failure(e) => Some(Left(e))
- }
- }
- }
-
- def isExpired: Boolean = false // Scala futures cant expire
-
- def isCanceled: Boolean = false // Scala futures cannot be cancelled
-
- override def futureValue(implicit config: PatienceConfig): T = {
- Await.result(future, Duration(config.timeout.totalNanos, TimeUnit.NANOSECONDS))
- }
- }
-}