aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-07-15 18:14:07 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-07-15 18:14:07 -0300
commitafda5a6ac02a5cd314638e40250b49f66cf3b419 (patch)
treed0f2337f570d883497e78c482629f2af60c95366
parente8dd6c83986f1ecd2d717c39bffe900b23b68854 (diff)
downloadKamon-afda5a6ac02a5cd314638e40250b49f66cf3b419.tar.gz
Kamon-afda5a6ac02a5cd314638e40250b49f66cf3b419.tar.bz2
Kamon-afda5a6ac02a5cd314638e40250b49f66cf3b419.zip
still a disaster, need to sync
-rw-r--r--src/main/resources/META-INF/aop.xml9
-rw-r--r--src/main/scala/kamon/Kamon.scala19
-rw-r--r--src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala54
-rw-r--r--src/main/scala/kamon/metric/Metrics.scala57
-rw-r--r--src/main/scala/kamon/metric/NewRelicReporter.scala11
-rw-r--r--src/main/scala/spraytest/ClientTest.scala4
-rw-r--r--src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala23
-rw-r--r--src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala6
8 files changed, 139 insertions, 44 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
index 152270a7..b0a1d40d 100644
--- a/src/main/resources/META-INF/aop.xml
+++ b/src/main/resources/META-INF/aop.xml
@@ -1,8 +1,8 @@
<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
<aspectj>
- <weaver options="-verbose -showWeaveInfo">
- <dump within="*" beforeandafter="true"/>
+ <weaver options="-verbose">
+ <!--<dump within="*" beforeandafter="true"/>-->
</weaver>
<aspects>
@@ -15,8 +15,9 @@
<aspect name="kamon.instrumentation.InceptionAspect"/>
<!-- ExecutorService Instrumentation for Akka. -->
- <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/>
- <aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/>
+<!-- <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/>
+ <aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/>-->
+ <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/>
diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala
index c1b97722..8fb3c24a 100644
--- a/src/main/scala/kamon/Kamon.scala
+++ b/src/main/scala/kamon/Kamon.scala
@@ -1,6 +1,9 @@
package kamon
import akka.actor.{Props, ActorSystem}
+import scala.collection.JavaConverters._
+import java.util.concurrent.ConcurrentHashMap
+import kamon.metric.{Atomic, ActorSystemMetrics}
object Kamon {
@@ -28,10 +31,26 @@ object Kamon {
def publish(tx: FullTransaction) = publisher ! tx
+
+
+ object Metric {
+ val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala
+
+ def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name))
+
+ def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
+ }
+
}
+
+
+
+
+
+
object Tracer {
val ctx = new ThreadLocal[Option[TraceContext]] {
override def initialValue() = None
diff --git a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
index 3ace3e77..6c79806d 100644
--- a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
+++ b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -4,8 +4,52 @@ import org.aspectj.lang.annotation._
import java.util.concurrent._
import org.aspectj.lang.ProceedingJoinPoint
import java.util
-import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector}
+import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector}
import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory}
+import com.typesafe.config.Config
+import kamon.Kamon
+
+
+@Aspect
+class ActorSystemInstrumentation {
+
+ @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)")
+ def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {}
+
+ @After("actorSystemInstantiation(name, applicationConfig, classLoader)")
+ def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = {
+
+ Kamon.Metric.registerActorSystem(name)
+ }
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
/**
@@ -34,6 +78,14 @@ trait WatchedExecutorService {
+trait ExecutorServiceMonitoring {
+ def dispatcherMetrics: DispatcherMetricCollector
+}
+
+class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring {
+ @volatile var dispatcherMetrics: DispatcherMetricCollector = _
+}
+
diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala
index 352c51a0..b0dc8ec5 100644
--- a/src/main/scala/kamon/metric/Metrics.scala
+++ b/src/main/scala/kamon/metric/Metrics.scala
@@ -1,19 +1,12 @@
package kamon.metric
import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit}
-import com.codahale.metrics._
import akka.actor.ActorRef
-import java.util.concurrent.atomic.AtomicReference
import com.codahale.metrics
-
-trait MetricDepot {
- def include(name: String, metric: Metric): Unit
- def exclude(name: String): Unit
-}
+import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry}
-
-object Metrics extends MetricDepot {
+object Metrics {
val registry: MetricRegistry = new MetricRegistry
val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS)
@@ -64,48 +57,54 @@ object MetricDirectory {
-case class ActorSystemMetrics(actorSystemName: String) {
- val dispatchers = new ConcurrentHashMap[String, DispatcherMetrics]
- def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = {
- ???
- }
-}
-case class DispatcherMetricCollector(activeThreadCount: ValueDistributionCollector, poolSize: ValueDistributionCollector, queueSize: ValueDistributionCollector)
-trait ValueDistributionCollector {
+case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram)
+
+
+
+
+trait Histogram {
def update(value: Long): Unit
- def snapshot: HistogramLike
+ def snapshot: HistogramSnapshot
}
-trait HistogramLike {
- def median: Long
- def max: Long
- def min: Long
+trait HistogramSnapshot {
+ def median: Double
+ def max: Double
+ def min: Double
}
-case class CodaHaleValueDistributionCollector extends ValueDistributionCollector {
- private[this] val histogram = new Histogram(new metrics.ExponentiallyDecayingReservoir())
- def median: Long = ???
+case class ActorSystemMetrics(actorSystemName: String) {
+ val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector]
- def max: Long = ???
+ private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram())
- def min: Long = ???
+ def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = Some(createDispatcherCollector)
+
+}
- def snapshot: HistogramLike = histogram.getSnapshot
+
+case class CodahaleHistogram() extends Histogram {
+ private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir())
def update(value: Long) = histogram.update(value)
-}
+ def snapshot: HistogramSnapshot = {
+ val snapshot = histogram.getSnapshot
+ CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin)
+ }
+}
+case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot
diff --git a/src/main/scala/kamon/metric/NewRelicReporter.scala b/src/main/scala/kamon/metric/NewRelicReporter.scala
index 67ee1ba5..70f3e54a 100644
--- a/src/main/scala/kamon/metric/NewRelicReporter.scala
+++ b/src/main/scala/kamon/metric/NewRelicReporter.scala
@@ -1,6 +1,7 @@
package kamon.metric
-import com.codahale.metrics._
+import com.codahale.metrics
+import metrics._
import java.util.concurrent.TimeUnit
import java.util
import com.newrelic.api.agent.NewRelic
@@ -9,6 +10,8 @@ import scala.collection.JavaConverters._
class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) {
+
+
private[NewRelicReporter] def processMeter(name: String, meter: Meter) {
NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat)
}
@@ -25,7 +28,7 @@ class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilt
}*/
- def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) {
+ def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) {
//Process Meters
meters.asScala.map{case(name, meter) => processMeter(name, meter)}
@@ -39,8 +42,10 @@ class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilt
NewRelic.recordMetric(fullMetricName, measure)
}}
}
+
+
}
object NewRelicReporter {
- def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS)
+ def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS)
} \ No newline at end of file
diff --git a/src/main/scala/spraytest/ClientTest.scala b/src/main/scala/spraytest/ClientTest.scala
index 7a95fc76..07532d0a 100644
--- a/src/main/scala/spraytest/ClientTest.scala
+++ b/src/main/scala/spraytest/ClientTest.scala
@@ -5,6 +5,8 @@ import spray.client.pipelining._
import spray.httpx.SprayJsonSupport
import spray.json._
import scala.concurrent.Future
+import spray.can.Http
+import akka.io.IO
/**
* BEGIN JSON Infrastructure
@@ -34,7 +36,7 @@ class ClientTest extends App {
import SprayJsonSupport._
-
+ val actor = IO(Http)
val pipeline = sendReceive ~> unmarshal[Container]
diff --git a/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala
new file mode 100644
index 00000000..489f3c1c
--- /dev/null
+++ b/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala
@@ -0,0 +1,23 @@
+package kamon.instrumentation
+
+import org.scalatest.{Matchers, WordSpec}
+import akka.actor.ActorSystem
+import kamon.Kamon
+
+class ActorSystemInstrumentationSpec extends WordSpec with Matchers {
+
+ // TODO: Selection filters to exclude unwanted actor systems. Read from configuration.
+
+ "the actor system instrumentation" should {
+ "register all actor systems created" in {
+ val as1 = ActorSystem("as1")
+ val as2 = ActorSystem("as2")
+
+
+ Kamon.Metric.actorSystem("as1") should not be (None)
+ Kamon.Metric.actorSystem("as2") should not be (None)
+ /*assert(Kamon.Metric.actorSystem("as2") != null)
+ assert(Kamon.Metric.actorSystem("as3") === null)*/
+ }
+ }
+}
diff --git a/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
index 517a4ce0..d72989f6 100644
--- a/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
+++ b/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
@@ -5,16 +5,10 @@ import akka.actor.ActorSystem
import kamon.metric.MetricDirectory
class DispatcherInstrumentationSpec extends WordSpec with Matchers{
- import MetricDirectory.dispatcherStats
"the dispatcher instrumentation" should {
"instrument a dispatcher that belongs to a non-filtered actor system" in {
- val defaultDispatcherStats = dispatcherStats("single-dispatcher", "akka.actor.default-dispatcher")
-
- defaultDispatcherStats should not be None
-
- //KamonMetrics.watch[Actor] named "ivan"
}
}