aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/resources/META-INF/aop.xml5
-rw-r--r--src/main/scala/kamon/Kamon.scala1
-rw-r--r--src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala43
-rw-r--r--src/main/scala/kamon/metric/Metrics.scala6
-rw-r--r--src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala3
-rw-r--r--src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala17
6 files changed, 68 insertions, 7 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
index b0a1d40d..38e5bb73 100644
--- a/src/main/resources/META-INF/aop.xml
+++ b/src/main/resources/META-INF/aop.xml
@@ -1,8 +1,8 @@
<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
<aspectj>
- <weaver options="-verbose">
- <!--<dump within="*" beforeandafter="true"/>-->
+ <weaver options="-verbose -showWeaveInfo">
+ <dump within="*" beforeandafter="true"/>
</weaver>
<aspects>
@@ -18,6 +18,7 @@
<!-- <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/>
<aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/>-->
<aspect name="kamon.instrumentation.ActorSystemInstrumentation"/>
+ <aspect name ="kamon.instrumentation.ForkJoinPoolInstrumentation"/>
diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala
index 8fb3c24a..9946a1fd 100644
--- a/src/main/scala/kamon/Kamon.scala
+++ b/src/main/scala/kamon/Kamon.scala
@@ -36,6 +36,7 @@ object Kamon {
object Metric {
val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala
+ def actorSystemNames: List[String] = actorSystems.keys.toList
def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name))
def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
diff --git a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
index 6c79806d..1f3564d3 100644
--- a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
+++ b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -8,6 +8,8 @@ import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, Exec
import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory}
import com.typesafe.config.Config
import kamon.Kamon
+import scala.concurrent.forkjoin.ForkJoinPool
+import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
@Aspect
@@ -23,6 +25,47 @@ class ActorSystemInstrumentation {
}
}
+@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))")
+class ForkJoinPoolInstrumentation {
+ var activeThreadsHistogram: Histogram = _
+
+ @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)")
+ def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {}
+
+ @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)")
+ def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = {
+ val (actorSystemName, dispatcherName) = threadFactory match {
+ case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames)
+ case _ => ("Unknown", "Unknown")
+ }
+
+ val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName)
+ for(m <- metrics) {
+ activeThreadsHistogram = m.activeThreadCount
+ println(s"Registered $dispatcherName for actor system $actorSystemName")
+ }
+ }
+
+ def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = {
+ knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName => (asName, threadFactoryName.substring(asName.length+1))).getOrElse(("Unkown", "Unkown"))
+ }
+
+
+
+
+ @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)")
+ def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {}
+
+ @After("forkJoinScan(fjp)")
+ def updateMetrics(fjp: AkkaForkJoinPool): Unit = {
+ activeThreadsHistogram.update(fjp.getActiveThreadCount)
+ println("UPDATED THE COUNT TWOOOO!!!")
+ }
+
+
+
+}
+
diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala
index b0dc8ec5..46809d8f 100644
--- a/src/main/scala/kamon/metric/Metrics.scala
+++ b/src/main/scala/kamon/metric/Metrics.scala
@@ -88,7 +88,11 @@ case class ActorSystemMetrics(actorSystemName: String) {
private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram())
- def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = Some(createDispatcherCollector)
+ def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = {
+ val stats = createDispatcherCollector
+ dispatchers.put(dispatcherName, stats)
+ Some(stats)
+ }
}
diff --git a/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala
index 489f3c1c..1eab6355 100644
--- a/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala
+++ b/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala
@@ -16,8 +16,7 @@ class ActorSystemInstrumentationSpec extends WordSpec with Matchers {
Kamon.Metric.actorSystem("as1") should not be (None)
Kamon.Metric.actorSystem("as2") should not be (None)
- /*assert(Kamon.Metric.actorSystem("as2") != null)
- assert(Kamon.Metric.actorSystem("as3") === null)*/
+ Kamon.Metric.actorSystem("unknown") should be (None)
}
}
}
diff --git a/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
index d72989f6..7a14af6c 100644
--- a/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
+++ b/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
@@ -1,14 +1,21 @@
package kamon.instrumentation
import org.scalatest.{Matchers, WordSpec}
-import akka.actor.ActorSystem
+import akka.actor.{Actor, Props, ActorSystem}
import kamon.metric.MetricDirectory
+import kamon.Kamon
class DispatcherInstrumentationSpec extends WordSpec with Matchers{
"the dispatcher instrumentation" should {
- "instrument a dispatcher that belongs to a non-filtered actor system" in {
+ "instrument a dispatcher that belongs to a non-filtered actor system" in new SingleDispatcherActorSystem {
+ val x = Kamon.Metric.actorSystem("single-dispatcher").get.dispatchers
+ (1 to 10).foreach(actor ! _)
+
+ val active = x.get("akka.actor.default-dispatcher").activeThreadCount.snapshot
+ println("Active max: "+active.max)
+ println("Active min: "+active.min)
}
}
@@ -16,6 +23,12 @@ class DispatcherInstrumentationSpec extends WordSpec with Matchers{
trait SingleDispatcherActorSystem {
val actorSystem = ActorSystem("single-dispatcher")
+ val actor = actorSystem.actorOf(Props(new Actor {
+ def receive = {
+ case a => sender ! a; println("BAAAANG")
+ }
+ }))
+
}
}