aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala')
-rw-r--r--src/main/scala/kamon/Kamon.scala64
-rw-r--r--src/main/scala/kamon/TraceContext.scala1
-rw-r--r--src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala13
-rw-r--r--src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala4
-rw-r--r--src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala4
-rw-r--r--src/main/scala/kamon/metric/Metrics.scala8
6 files changed, 80 insertions, 14 deletions
diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala
index 9946a1fd..5a1382a4 100644
--- a/src/main/scala/kamon/Kamon.scala
+++ b/src/main/scala/kamon/Kamon.scala
@@ -1,9 +1,11 @@
package kamon
-import akka.actor.{Props, ActorSystem}
+import akka.actor.{Actor, Props, ActorSystem}
import scala.collection.JavaConverters._
import java.util.concurrent.ConcurrentHashMap
-import kamon.metric.{Atomic, ActorSystemMetrics}
+import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics}
+import scala.concurrent.duration.{FiniteDuration, Duration}
+import com.newrelic.api.agent.NewRelic
object Kamon {
@@ -42,6 +44,11 @@ object Kamon {
def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
}
+
+
+ val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
+ val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")
+
}
@@ -68,5 +75,58 @@ object Tracer {
}
//def newTraceContext(): TraceContext = TraceContext()
+}
+
+
+class MetricManager extends Actor {
+ implicit val ec = context.system.dispatcher
+ def receive = {
+ case RegisterForAllDispatchers(frequency) => {
+ val subscriber = sender
+ context.system.scheduler.schedule(frequency, frequency) {
+ Kamon.Metric.actorSystems.foreach {
+ case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach {
+ case (dispatcherName, dispatcherMetrics) => {
+ val activeThreads = dispatcherMetrics.activeThreadCount.snapshot
+ val poolSize = dispatcherMetrics.poolSize.snapshot
+ val queueSize = dispatcherMetrics.queueSize.snapshot
+
+ subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize)
+
+ }
+ }
+ }
+ }
+ }
+ }
}
+
+case class RegisterForAllDispatchers(frequency: FiniteDuration)
+case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot)
+
+
+
+
+
+
+class NewrelicReporterActor extends Actor {
+ import scala.concurrent.duration._
+
+ Kamon.metricManager ! RegisterForAllDispatchers(5 seconds)
+
+ def receive = {
+ case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => {
+ println("PUBLISHED DISPATCHER STATS")
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat)
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat))
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat)
+
+
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat)
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat))
+
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat)
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala
index 0bfcd74b..6b32550f 100644
--- a/src/main/scala/kamon/TraceContext.scala
+++ b/src/main/scala/kamon/TraceContext.scala
@@ -20,6 +20,7 @@ case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]],
}
object TraceContext {
+ implicit val as2 = Kamon.actorSystem.dispatcher
def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil))
}
diff --git a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
index 6677f0f7..7398a2bd 100644
--- a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -4,7 +4,7 @@ import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
import akka.actor.{Props, ActorSystem, ActorRef}
import kamon.{Kamon, TraceContext}
-import akka.dispatch.Envelope
+import akka.dispatch.{MessageDispatcher, Envelope}
import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram}
import kamon.metric.{MetricDirectory, Metrics}
import com.codahale.metrics
@@ -38,11 +38,13 @@ class ActorCellInvokeInstrumentation {
var processingTimeTimer: Timer = _
var shouldTrack = false
- @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, parent)")
- def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {}
+ // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut.
- @After("actorCellCreation(system, ref, props, parent)")
- def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {
+ @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
+ def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
+
+ @After("actorCellCreation(system, ref, props, dispatcher, parent)")
+ def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
val actorName = MetricDirectory.nameForActor(ref)
val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
@@ -73,6 +75,7 @@ class ActorCellInvokeInstrumentation {
ctx match {
case Some(c) => {
Kamon.set(c)
+ //println("ENVELOPE ORIGINAL:---------------->"+originalEnvelope)
pjp.proceedWith(originalEnvelope)
Kamon.clear
}
diff --git a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
index 1f3564d3..b4f8a475 100644
--- a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
+++ b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -28,6 +28,7 @@ class ActorSystemInstrumentation {
@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))")
class ForkJoinPoolInstrumentation {
var activeThreadsHistogram: Histogram = _
+ var poolSizeHistogram: Histogram = _
@Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)")
def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {}
@@ -42,6 +43,7 @@ class ForkJoinPoolInstrumentation {
val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName)
for(m <- metrics) {
activeThreadsHistogram = m.activeThreadCount
+ poolSizeHistogram = m.poolSize
println(s"Registered $dispatcherName for actor system $actorSystemName")
}
}
@@ -59,7 +61,7 @@ class ForkJoinPoolInstrumentation {
@After("forkJoinScan(fjp)")
def updateMetrics(fjp: AkkaForkJoinPool): Unit = {
activeThreadsHistogram.update(fjp.getActiveThreadCount)
- println("UPDATED THE COUNT TWOOOO!!!")
+ poolSizeHistogram.update(fjp.getPoolSize)
}
diff --git a/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
index 75d6189c..c21502ac 100644
--- a/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
+++ b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
@@ -1,7 +1,7 @@
package kamon.instrumentation
import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
-import akka.dispatch.{Envelope, MessageQueue}
+import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue}
import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect}
import akka.actor.{ActorSystem, ActorRef}
import kamon.metric.{Metrics, MetricDirectory}
@@ -44,7 +44,7 @@ class MessageQueueInstrumentation {
}
-class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue {
+class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{
def enqueue(receiver: ActorRef, handle: Envelope) = {
delegate.enqueue(receiver, handle)
diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala
index 46809d8f..3992ab43 100644
--- a/src/main/scala/kamon/metric/Metrics.scala
+++ b/src/main/scala/kamon/metric/Metrics.scala
@@ -10,11 +10,10 @@ object Metrics {
val registry: MetricRegistry = new MetricRegistry
val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS)
- val newrelicReporter = NewRelicReporter(registry)
+ //consoleReporter.build().start(45, TimeUnit.SECONDS)
+ //val newrelicReporter = NewRelicReporter(registry)
//newrelicReporter.start(5, TimeUnit.SECONDS)
- consoleReporter.build().start(10, TimeUnit.SECONDS)
-
def include(name: String, metric: Metric) = registry.register(name, metric)
@@ -84,7 +83,8 @@ trait HistogramSnapshot {
case class ActorSystemMetrics(actorSystemName: String) {
- val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector]
+ import scala.collection.JavaConverters._
+ val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala
private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram())