diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-07-15 18:14:07 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-07-15 18:14:07 -0300 |
commit | afda5a6ac02a5cd314638e40250b49f66cf3b419 (patch) | |
tree | d0f2337f570d883497e78c482629f2af60c95366 /src | |
parent | e8dd6c83986f1ecd2d717c39bffe900b23b68854 (diff) | |
download | Kamon-afda5a6ac02a5cd314638e40250b49f66cf3b419.tar.gz Kamon-afda5a6ac02a5cd314638e40250b49f66cf3b419.tar.bz2 Kamon-afda5a6ac02a5cd314638e40250b49f66cf3b419.zip |
still a disaster, need to sync
Diffstat (limited to 'src')
8 files changed, 139 insertions, 44 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index 152270a7..b0a1d40d 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -1,8 +1,8 @@ <!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> <aspectj> - <weaver options="-verbose -showWeaveInfo"> - <dump within="*" beforeandafter="true"/> + <weaver options="-verbose"> + <!--<dump within="*" beforeandafter="true"/>--> </weaver> <aspects> @@ -15,8 +15,9 @@ <aspect name="kamon.instrumentation.InceptionAspect"/> <!-- ExecutorService Instrumentation for Akka. --> - <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/> - <aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/> +<!-- <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/> + <aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/>--> + <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/> diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala index c1b97722..8fb3c24a 100644 --- a/src/main/scala/kamon/Kamon.scala +++ b/src/main/scala/kamon/Kamon.scala @@ -1,6 +1,9 @@ package kamon import akka.actor.{Props, ActorSystem} +import scala.collection.JavaConverters._ +import java.util.concurrent.ConcurrentHashMap +import kamon.metric.{Atomic, ActorSystemMetrics} object Kamon { @@ -28,10 +31,26 @@ object Kamon { def publish(tx: FullTransaction) = publisher ! tx + + + object Metric { + val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala + + def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name)) + + def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) + } + } + + + + + + object Tracer { val ctx = new ThreadLocal[Option[TraceContext]] { override def initialValue() = None diff --git a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index 3ace3e77..6c79806d 100644 --- a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -4,8 +4,52 @@ import org.aspectj.lang.annotation._ import java.util.concurrent._ import org.aspectj.lang.ProceedingJoinPoint import java.util -import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector} +import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector} import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory} +import com.typesafe.config.Config +import kamon.Kamon + + +@Aspect +class ActorSystemInstrumentation { + + @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)") + def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {} + + @After("actorSystemInstantiation(name, applicationConfig, classLoader)") + def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = { + + Kamon.Metric.registerActorSystem(name) + } +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + /** @@ -34,6 +78,14 @@ trait WatchedExecutorService { +trait ExecutorServiceMonitoring { + def dispatcherMetrics: DispatcherMetricCollector +} + +class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring { + @volatile var dispatcherMetrics: DispatcherMetricCollector = _ +} + diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala index 352c51a0..b0dc8ec5 100644 --- a/src/main/scala/kamon/metric/Metrics.scala +++ b/src/main/scala/kamon/metric/Metrics.scala @@ -1,19 +1,12 @@ package kamon.metric import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit} -import com.codahale.metrics._ import akka.actor.ActorRef -import java.util.concurrent.atomic.AtomicReference import com.codahale.metrics - -trait MetricDepot { - def include(name: String, metric: Metric): Unit - def exclude(name: String): Unit -} +import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry} - -object Metrics extends MetricDepot { +object Metrics { val registry: MetricRegistry = new MetricRegistry val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS) @@ -64,48 +57,54 @@ object MetricDirectory { -case class ActorSystemMetrics(actorSystemName: String) { - val dispatchers = new ConcurrentHashMap[String, DispatcherMetrics] - def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = { - ??? - } -} -case class DispatcherMetricCollector(activeThreadCount: ValueDistributionCollector, poolSize: ValueDistributionCollector, queueSize: ValueDistributionCollector) -trait ValueDistributionCollector { +case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram) + + + + +trait Histogram { def update(value: Long): Unit - def snapshot: HistogramLike + def snapshot: HistogramSnapshot } -trait HistogramLike { - def median: Long - def max: Long - def min: Long +trait HistogramSnapshot { + def median: Double + def max: Double + def min: Double } -case class CodaHaleValueDistributionCollector extends ValueDistributionCollector { - private[this] val histogram = new Histogram(new metrics.ExponentiallyDecayingReservoir()) - def median: Long = ??? +case class ActorSystemMetrics(actorSystemName: String) { + val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] - def max: Long = ??? + private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram()) - def min: Long = ??? + def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = Some(createDispatcherCollector) + +} - def snapshot: HistogramLike = histogram.getSnapshot + +case class CodahaleHistogram() extends Histogram { + private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir()) def update(value: Long) = histogram.update(value) -} + def snapshot: HistogramSnapshot = { + val snapshot = histogram.getSnapshot + CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin) + } +} +case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot diff --git a/src/main/scala/kamon/metric/NewRelicReporter.scala b/src/main/scala/kamon/metric/NewRelicReporter.scala index 67ee1ba5..70f3e54a 100644 --- a/src/main/scala/kamon/metric/NewRelicReporter.scala +++ b/src/main/scala/kamon/metric/NewRelicReporter.scala @@ -1,6 +1,7 @@ package kamon.metric -import com.codahale.metrics._ +import com.codahale.metrics +import metrics._ import java.util.concurrent.TimeUnit import java.util import com.newrelic.api.agent.NewRelic @@ -9,6 +10,8 @@ import scala.collection.JavaConverters._ class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) { + + private[NewRelicReporter] def processMeter(name: String, meter: Meter) { NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat) } @@ -25,7 +28,7 @@ class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilt }*/ - def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { + def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { //Process Meters meters.asScala.map{case(name, meter) => processMeter(name, meter)} @@ -39,8 +42,10 @@ class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilt NewRelic.recordMetric(fullMetricName, measure) }} } + + } object NewRelicReporter { - def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS) + def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS) }
\ No newline at end of file diff --git a/src/main/scala/spraytest/ClientTest.scala b/src/main/scala/spraytest/ClientTest.scala index 7a95fc76..07532d0a 100644 --- a/src/main/scala/spraytest/ClientTest.scala +++ b/src/main/scala/spraytest/ClientTest.scala @@ -5,6 +5,8 @@ import spray.client.pipelining._ import spray.httpx.SprayJsonSupport import spray.json._ import scala.concurrent.Future +import spray.can.Http +import akka.io.IO /** * BEGIN JSON Infrastructure @@ -34,7 +36,7 @@ class ClientTest extends App { import SprayJsonSupport._ - + val actor = IO(Http) val pipeline = sendReceive ~> unmarshal[Container] diff --git a/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala new file mode 100644 index 00000000..489f3c1c --- /dev/null +++ b/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala @@ -0,0 +1,23 @@ +package kamon.instrumentation + +import org.scalatest.{Matchers, WordSpec} +import akka.actor.ActorSystem +import kamon.Kamon + +class ActorSystemInstrumentationSpec extends WordSpec with Matchers { + + // TODO: Selection filters to exclude unwanted actor systems. Read from configuration. + + "the actor system instrumentation" should { + "register all actor systems created" in { + val as1 = ActorSystem("as1") + val as2 = ActorSystem("as2") + + + Kamon.Metric.actorSystem("as1") should not be (None) + Kamon.Metric.actorSystem("as2") should not be (None) + /*assert(Kamon.Metric.actorSystem("as2") != null) + assert(Kamon.Metric.actorSystem("as3") === null)*/ + } + } +} diff --git a/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala index 517a4ce0..d72989f6 100644 --- a/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala +++ b/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala @@ -5,16 +5,10 @@ import akka.actor.ActorSystem import kamon.metric.MetricDirectory class DispatcherInstrumentationSpec extends WordSpec with Matchers{ - import MetricDirectory.dispatcherStats "the dispatcher instrumentation" should { "instrument a dispatcher that belongs to a non-filtered actor system" in { - val defaultDispatcherStats = dispatcherStats("single-dispatcher", "akka.actor.default-dispatcher") - - defaultDispatcherStats should not be None - - //KamonMetrics.watch[Actor] named "ivan" } } |