aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--project/Build.scala1
-rw-r--r--src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala90
-rw-r--r--src/main/scala/kamon/metric/MetricFilter.scala6
-rw-r--r--src/main/scala/kamon/metric/Metrics.scala83
-rw-r--r--src/main/scala/spraytest/FutureTesting.scala2
-rw-r--r--src/main/scala/test/PingPong.scala34
-rw-r--r--src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala27
7 files changed, 235 insertions, 8 deletions
diff --git a/project/Build.scala b/project/Build.scala
index f3e6e8da..7d89713c 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -12,6 +12,7 @@ object Build extends Build {
.settings(revolverSettings: _*)
.settings(aspectJSettings: _*)
.settings(newrelicSettings: _*)
+
.settings(
libraryDependencies ++=
compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, sprayJson) ++
diff --git a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
index f3ee4ee7..3ace3e77 100644
--- a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
+++ b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -8,6 +8,47 @@ import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector}
import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory}
+/**
+ * ExecutorService monitoring base:
+ */
+trait ExecutorServiceCollector {
+ def updateActiveThreadCount(diff: Int): Unit
+ def updateTotalThreadCount(diff: Int): Unit
+ def updateQueueSize(diff: Int): Unit
+}
+
+trait WatchedExecutorService {
+ def collector: ExecutorServiceCollector
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = delegate.createExecutorService
}
@@ -15,11 +56,13 @@ case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatch
@Aspect
class ExecutorServiceFactoryProviderInstrumentation {
- @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(id, threadFactory)")
- def factoryMethodCall(id: String, threadFactory: ThreadFactory) = {}
+ @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()")
+ def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = {
+ true
+ }
- @Around("factoryMethodCall(id, threadFactory)")
- def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
+ @Around("factoryMethodCall(dispatcherName, threadFactory)")
+ def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast
val actorSystemName = threadFactory match {
@@ -27,7 +70,7 @@ class ExecutorServiceFactoryProviderInstrumentation {
case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name.
}
- new NamedExecutorServiceFactoryDelegate(actorSystemName, id, delegate)
+ new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate)
}
}
@@ -67,4 +110,39 @@ case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorServ
def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks)
def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit)
def execute(command: Runnable) = delegate.execute(command)
-} \ No newline at end of file
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/main/scala/kamon/metric/MetricFilter.scala b/src/main/scala/kamon/metric/MetricFilter.scala
new file mode 100644
index 00000000..fb117968
--- /dev/null
+++ b/src/main/scala/kamon/metric/MetricFilter.scala
@@ -0,0 +1,6 @@
+package kamon.metric
+
+object MetricFilter {
+ def actorSystem(system: String): Boolean = !system.startsWith("kamon")
+ def actor(path: String, system: String): Boolean = true
+}
diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala
index 46fb2b64..352c51a0 100644
--- a/src/main/scala/kamon/metric/Metrics.scala
+++ b/src/main/scala/kamon/metric/Metrics.scala
@@ -1,8 +1,10 @@
package kamon.metric
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit}
import com.codahale.metrics._
import akka.actor.ActorRef
+import java.util.concurrent.atomic.AtomicReference
+import com.codahale.metrics
trait MetricDepot {
def include(name: String, metric: Metric): Unit
@@ -38,6 +40,11 @@ object Metrics extends MetricDepot {
}
}
+object Watched {
+ case object Actor
+ case object Dispatcher
+}
+
object MetricDirectory {
def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/"
@@ -53,8 +60,82 @@ object MetricDirectory {
}
+}
+
+
+
+case class ActorSystemMetrics(actorSystemName: String) {
+ val dispatchers = new ConcurrentHashMap[String, DispatcherMetrics]
+
+ def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = {
+ ???
+ }
+
+}
+
+
+
+case class DispatcherMetricCollector(activeThreadCount: ValueDistributionCollector, poolSize: ValueDistributionCollector, queueSize: ValueDistributionCollector)
+
+
+trait ValueDistributionCollector {
+ def update(value: Long): Unit
+ def snapshot: HistogramLike
}
+trait HistogramLike {
+ def median: Long
+ def max: Long
+ def min: Long
+}
+
+case class CodaHaleValueDistributionCollector extends ValueDistributionCollector {
+ private[this] val histogram = new Histogram(new metrics.ExponentiallyDecayingReservoir())
+
+ def median: Long = ???
+
+ def max: Long = ???
+
+ def min: Long = ???
+
+ def snapshot: HistogramLike = histogram.getSnapshot
+
+ def update(value: Long) = histogram.update(value)
+}
+
+
+
+
+
+
+
+
+
+/**
+ * Dispatcher Metrics that we care about currently with a histogram-like nature:
+ * - Work Queue Size
+ * - Total/Active Thread Count
+ */
+
+
+
+import annotation.tailrec
+import java.util.concurrent.atomic.AtomicReference
+
+object Atomic {
+ def apply[T]( obj : T) = new Atomic(new AtomicReference(obj))
+ implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref)
+}
+
+class Atomic[T](val atomic : AtomicReference[T]) {
+ @tailrec
+ final def update(f: T => T) : T = {
+ val oldValue = atomic.get()
+ val newValue = f(oldValue)
+ if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f)
+ }
+ def get() = atomic.get()
+} \ No newline at end of file
diff --git a/src/main/scala/spraytest/FutureTesting.scala b/src/main/scala/spraytest/FutureTesting.scala
index f592f6d7..b864d6d6 100644
--- a/src/main/scala/spraytest/FutureTesting.scala
+++ b/src/main/scala/spraytest/FutureTesting.scala
@@ -63,7 +63,7 @@ object TraceableFuture {
implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future
def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = {
- val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.id, Nil))
+ val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.dispatcherName, Nil))
new TraceableFuture(Future { wrappedBody })
}
diff --git a/src/main/scala/test/PingPong.scala b/src/main/scala/test/PingPong.scala
new file mode 100644
index 00000000..f9d6869c
--- /dev/null
+++ b/src/main/scala/test/PingPong.scala
@@ -0,0 +1,34 @@
+package test
+
+import akka.actor.{Props, Actor, ActorSystem}
+
+object PingPong extends App {
+
+ val as = ActorSystem("ping-pong")
+
+ val pinger = as.actorOf(Props[Pinger])
+ val ponger = as.actorOf(Props[Ponger])
+
+ pinger.tell(Pong, ponger)
+
+
+ Thread.sleep(30000)
+ as.shutdown()
+
+
+}
+
+case object Ping
+case object Pong
+
+class Pinger extends Actor {
+ def receive = {
+ case Pong => sender ! Ping
+ }
+}
+
+class Ponger extends Actor {
+ def receive = {
+ case Ping => sender ! Pong
+ }
+}
diff --git a/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
new file mode 100644
index 00000000..517a4ce0
--- /dev/null
+++ b/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
@@ -0,0 +1,27 @@
+package kamon.instrumentation
+
+import org.scalatest.{Matchers, WordSpec}
+import akka.actor.ActorSystem
+import kamon.metric.MetricDirectory
+
+class DispatcherInstrumentationSpec extends WordSpec with Matchers{
+ import MetricDirectory.dispatcherStats
+
+
+ "the dispatcher instrumentation" should {
+ "instrument a dispatcher that belongs to a non-filtered actor system" in {
+ val defaultDispatcherStats = dispatcherStats("single-dispatcher", "akka.actor.default-dispatcher")
+
+ defaultDispatcherStats should not be None
+
+ //KamonMetrics.watch[Actor] named "ivan"
+
+ }
+ }
+
+
+ trait SingleDispatcherActorSystem {
+ val actorSystem = ActorSystem("single-dispatcher")
+ }
+}
+