diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-07-10 18:13:49 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-07-10 18:13:49 -0300 |
commit | e8dd6c83986f1ecd2d717c39bffe900b23b68854 (patch) | |
tree | 43e0feaa42225a3770922a9366e126590225719a /src/main/scala/kamon | |
parent | d1e22b3f446c89413c67421f19ab5215ebdfcd43 (diff) | |
download | Kamon-e8dd6c83986f1ecd2d717c39bffe900b23b68854.tar.gz Kamon-e8dd6c83986f1ecd2d717c39bffe900b23b68854.tar.bz2 Kamon-e8dd6c83986f1ecd2d717c39bffe900b23b68854.zip |
complete disaster, wip
Diffstat (limited to 'src/main/scala/kamon')
-rw-r--r-- | src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala | 90 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/MetricFilter.scala | 6 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/Metrics.scala | 83 |
3 files changed, 172 insertions, 7 deletions
diff --git a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index f3ee4ee7..3ace3e77 100644 --- a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -8,6 +8,47 @@ import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector} import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory} +/** + * ExecutorService monitoring base: + */ +trait ExecutorServiceCollector { + def updateActiveThreadCount(diff: Int): Unit + def updateTotalThreadCount(diff: Int): Unit + def updateQueueSize(diff: Int): Unit +} + +trait WatchedExecutorService { + def collector: ExecutorServiceCollector +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = delegate.createExecutorService } @@ -15,11 +56,13 @@ case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatch @Aspect class ExecutorServiceFactoryProviderInstrumentation { - @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(id, threadFactory)") - def factoryMethodCall(id: String, threadFactory: ThreadFactory) = {} + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()") + def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = { + true + } - @Around("factoryMethodCall(id, threadFactory)") - def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + @Around("factoryMethodCall(dispatcherName, threadFactory)") + def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast val actorSystemName = threadFactory match { @@ -27,7 +70,7 @@ class ExecutorServiceFactoryProviderInstrumentation { case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. } - new NamedExecutorServiceFactoryDelegate(actorSystemName, id, delegate) + new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate) } } @@ -67,4 +110,39 @@ case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorServ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks) def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit) def execute(command: Runnable) = delegate.execute(command) -}
\ No newline at end of file +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/scala/kamon/metric/MetricFilter.scala b/src/main/scala/kamon/metric/MetricFilter.scala new file mode 100644 index 00000000..fb117968 --- /dev/null +++ b/src/main/scala/kamon/metric/MetricFilter.scala @@ -0,0 +1,6 @@ +package kamon.metric + +object MetricFilter { + def actorSystem(system: String): Boolean = !system.startsWith("kamon") + def actor(path: String, system: String): Boolean = true +} diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala index 46fb2b64..352c51a0 100644 --- a/src/main/scala/kamon/metric/Metrics.scala +++ b/src/main/scala/kamon/metric/Metrics.scala @@ -1,8 +1,10 @@ package kamon.metric -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit} import com.codahale.metrics._ import akka.actor.ActorRef +import java.util.concurrent.atomic.AtomicReference +import com.codahale.metrics trait MetricDepot { def include(name: String, metric: Metric): Unit @@ -38,6 +40,11 @@ object Metrics extends MetricDepot { } } +object Watched { + case object Actor + case object Dispatcher +} + object MetricDirectory { def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/" @@ -53,8 +60,82 @@ object MetricDirectory { } +} + + + +case class ActorSystemMetrics(actorSystemName: String) { + val dispatchers = new ConcurrentHashMap[String, DispatcherMetrics] + + def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = { + ??? + } + +} + + + +case class DispatcherMetricCollector(activeThreadCount: ValueDistributionCollector, poolSize: ValueDistributionCollector, queueSize: ValueDistributionCollector) + + +trait ValueDistributionCollector { + def update(value: Long): Unit + def snapshot: HistogramLike } +trait HistogramLike { + def median: Long + def max: Long + def min: Long +} + +case class CodaHaleValueDistributionCollector extends ValueDistributionCollector { + private[this] val histogram = new Histogram(new metrics.ExponentiallyDecayingReservoir()) + + def median: Long = ??? + + def max: Long = ??? + + def min: Long = ??? + + def snapshot: HistogramLike = histogram.getSnapshot + + def update(value: Long) = histogram.update(value) +} + + + + + + + + + +/** + * Dispatcher Metrics that we care about currently with a histogram-like nature: + * - Work Queue Size + * - Total/Active Thread Count + */ + + + +import annotation.tailrec +import java.util.concurrent.atomic.AtomicReference + +object Atomic { + def apply[T]( obj : T) = new Atomic(new AtomicReference(obj)) + implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref) +} + +class Atomic[T](val atomic : AtomicReference[T]) { + @tailrec + final def update(f: T => T) : T = { + val oldValue = atomic.get() + val newValue = f(oldValue) + if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f) + } + def get() = atomic.get() +}
\ No newline at end of file |