diff options
author | Diego Parra <diegolparra@gmail.com> | 2015-10-23 19:34:28 -0300 |
---|---|---|
committer | Diego Parra <diegolparra@gmail.com> | 2015-10-23 19:34:28 -0300 |
commit | f3eefdb687358401965fdd793c3a68507e27aa12 (patch) | |
tree | c4b04a10ca7a23232e05bca74cc0fa57eea62903 | |
parent | ccb049c07fc28d25f389be72a242240fec1881a3 (diff) | |
parent | 98ff71de1d04c1bb65ca1d6220ddc5a075259248 (diff) | |
download | Kamon-f3eefdb687358401965fdd793c3a68507e27aa12.tar.gz Kamon-f3eefdb687358401965fdd793c3a68507e27aa12.tar.bz2 Kamon-f3eefdb687358401965fdd793c3a68507e27aa12.zip |
Merge pull request #258 from Tolsi/stolmachev-kamon-jmx
Kamon JMX was added
-rw-r--r-- | kamon-jmx/src/main/resources/reference.conf | 30 | ||||
-rw-r--r-- | kamon-jmx/src/main/scala/kamon/jmx/JMXReporter.scala | 77 | ||||
-rw-r--r-- | kamon-jmx/src/main/scala/kamon/jmx/JMXReporterActor.scala | 212 | ||||
-rw-r--r-- | kamon-playground/src/main/resources/application.conf | 1 | ||||
-rw-r--r-- | project/Projects.scala | 15 |
5 files changed, 333 insertions, 2 deletions
diff --git a/kamon-jmx/src/main/resources/reference.conf b/kamon-jmx/src/main/resources/reference.conf new file mode 100644 index 00000000..41bf366b --- /dev/null +++ b/kamon-jmx/src/main/resources/reference.conf @@ -0,0 +1,30 @@ +# ========================================== # +# Kamon-JMX-Reporter Reference Configuration # +# ========================================== # + +kamon { + + jmx { + subscriptions { + histogram = [ "**" ] + min-max-counter = [ "**" ] + gauge = [ "**" ] + counter = [ "**" ] + trace = [ "**" ] + trace-segment = [ "**" ] + akka-actor = [ "**" ] + akka-dispatcher = [ "**" ] + akka-router = [ "**" ] + system-metric = [ "**" ] + http-server = [ "**" ] + } + } + + modules { + kamon-jmx { + auto-start = yes + requires-aspectj = no + extension-id = "kamon.jmx.JMXReporter" + } + } +} diff --git a/kamon-jmx/src/main/scala/kamon/jmx/JMXReporter.scala b/kamon-jmx/src/main/scala/kamon/jmx/JMXReporter.scala new file mode 100644 index 00000000..e1e12aea --- /dev/null +++ b/kamon-jmx/src/main/scala/kamon/jmx/JMXReporter.scala @@ -0,0 +1,77 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= +*/ + +package kamon.jmx + +import javax.management.{JMException, JMRuntimeException} + +import akka.actor._ +import akka.event.Logging +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.util.ConfigTools.Syntax +import scala.collection.JavaConverters._ + +object JMXReporter extends ExtensionId[JMXExtension] with ExtensionIdProvider { + override def lookup(): ExtensionId[_ <: Extension] = JMXReporter + + override def createExtension(system: ExtendedActorSystem): JMXExtension = new JMXExtension(system) +} + +class JMXExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val log = Logging(system, getClass) + log.info("Starting the Kamon(JMX) extension") + + val subscriber = system.actorOf(Props[JMXReporterSupervisor], "kamon-jmx-reporter") + + val jmxConfig = system.settings.config.getConfig("kamon.jmx") + val subscriptions = jmxConfig.getConfig("subscriptions") + + subscriptions.firstLevelKeys foreach { subscriptionCategory ⇒ + subscriptions.getStringList(subscriptionCategory).asScala.foreach { pattern ⇒ + Kamon.metrics.subscribe(subscriptionCategory, pattern, subscriber, permanently = true) + } + } +} + +private trait ActorJMXSupervisor extends Actor with ActorLogging { + + import akka.actor.OneForOneStrategy + import akka.actor.SupervisorStrategy._ + + import scala.concurrent.duration._ + + override val supervisorStrategy = + OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { + case e: JMRuntimeException ⇒ + log.error(e, "Supervisor strategy STOPPING actor from errors during JMX invocation") + Stop + case e: JMException ⇒ + log.error(e, "Supervisor strategy STOPPING actor from incorrect invocation of JMX registration") + Stop + case t ⇒ + // Use the default supervisor strategy otherwise. + super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate) + } +} + +private class JMXReporterSupervisor extends Actor with ActorLogging with ActorJMXSupervisor { + private val jmxActor = context.actorOf(JMXReporterActor.props, "kamon-jmx-actor") + + def receive = { + case tick: TickMetricSnapshot ⇒ jmxActor ! tick + } +}
\ No newline at end of file diff --git a/kamon-jmx/src/main/scala/kamon/jmx/JMXReporterActor.scala b/kamon-jmx/src/main/scala/kamon/jmx/JMXReporterActor.scala new file mode 100644 index 00000000..110943f9 --- /dev/null +++ b/kamon-jmx/src/main/scala/kamon/jmx/JMXReporterActor.scala @@ -0,0 +1,212 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= +*/ + +package kamon.jmx + +import java.lang.management.ManagementFactory +import javax.management._ + +import akka.actor.{Actor, Props} +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.instrument.{Counter, Histogram, InstrumentSnapshot} +import kamon.metric.{Entity, EntitySnapshot} +import org.slf4j.LoggerFactory + +import scala.collection.concurrent.TrieMap + +private object MetricMBeans { + + private implicit class RichHistogramSnapshot(histogram: Histogram.Snapshot) { + def average: Double = { + if (histogram.numberOfMeasurements == 0) 0D + else histogram.sum / histogram.numberOfMeasurements + } + } + + private[jmx] sealed trait MetricMBean + + private[jmx] abstract class AbstractMetricMBean[T <: InstrumentSnapshot] extends MetricMBean { + private[jmx] var snapshot: T + + private[jmx] def objectName: ObjectName + } + + private[jmx] trait HistogramMetricMBean extends AbstractMetricMBean[Histogram.Snapshot] { + def getNumberOfMeasurements: Long + + def getMin: Long + + def get50thPercentile: Long + + def getPercentile(n: Double): Long + + def get70thPercentile: Long + + def get90thPercentile: Long + + def get95thPercentile: Long + + def get990thPercentile: Long + + def get999thPercentile: Long + + def getAvg: Double + + def getMax: Long + + def getSum: Long + } + + private[jmx] class HistogramMetric(@volatile private[jmx] override var snapshot: Histogram.Snapshot, private[jmx] override val objectName: ObjectName) extends HistogramMetricMBean { + override def getNumberOfMeasurements: Long = snapshot.numberOfMeasurements + + override def getMin: Long = snapshot.min + + override def get50thPercentile: Long = snapshot.percentile(50.0D) + + override def get70thPercentile: Long = snapshot.percentile(70.0D) + + override def get90thPercentile: Long = snapshot.percentile(90.0D) + + override def get95thPercentile: Long = snapshot.percentile(95.0D) + + override def get990thPercentile: Long = snapshot.percentile(99.0D) + + override def get999thPercentile: Long = snapshot.percentile(99.9D) + + override def getPercentile(n: Double): Long = snapshot.percentile(n) + + override def getMax: Long = snapshot.max + + override def getSum: Long = snapshot.sum + + override def getAvg: Double = snapshot.average + } + + private[jmx] trait CounterMetricMBean extends AbstractMetricMBean[Counter.Snapshot] { + def getCount: Long + } + + private[jmx] trait HistogramValueMetricMBean extends AbstractMetricMBean[Histogram.Snapshot] { + def getValue: Long + } + + private[jmx] class CounterMetric(@volatile private[jmx] override var snapshot: Counter.Snapshot, private[jmx] override val objectName: ObjectName) extends CounterMetricMBean { + override def getCount: Long = snapshot.count + } + + private[jmx] class HistogramValueMetric(@volatile private[jmx] override var snapshot: Histogram.Snapshot, private[jmx] override val objectName: ObjectName) extends HistogramValueMetricMBean { + override def getValue: Long = snapshot.max + } + + private[jmx] implicit def impCreateHistogramMetric(snapshot: Histogram.Snapshot, objectName: ObjectName) = new HistogramMetric(snapshot, objectName) + + private[jmx] implicit def impCounterMetric(snapshot: Counter.Snapshot, objectName: ObjectName) = new CounterMetric(snapshot, objectName) + +} + +private object MBeanManager { + + import MetricMBeans._ + + private val mbs = ManagementFactory.getPlatformMBeanServer + + private val registeredMBeans = TrieMap.empty[String, AbstractMetricMBean[_]] + + private val log = LoggerFactory.getLogger(getClass) + + private[jmx] def createOrUpdateMBean[M <: AbstractMetricMBean[T], T <: InstrumentSnapshot](group: String, name: String, snapshot: T)(implicit buildMetricMBean: (T, ObjectName) ⇒ M): Unit = { + registeredMBeans.get(name) match { + case Some(mbean: M) ⇒ + mbean.snapshot = snapshot + case None ⇒ + val objectName = new ObjectName(createMBeanName("kamon", group, name)) + val mbean = buildMetricMBean(snapshot, objectName) + registeredMBeans += name -> mbean + mbs.registerMBean(mbean, objectName) + case _ ⇒ throw new IllegalStateException("Illegal metric bean type") + } + } + + private[jmx] def unregisterAllBeans(): Unit = { + registeredMBeans.values.map(_.objectName).foreach(name ⇒ + try { + mbs.unregisterMBean(name) + } catch { + case e: InstanceNotFoundException ⇒ if (log.isTraceEnabled) log.trace(s"Error unregistering $name", e) + case e: MBeanRegistrationException ⇒ if (log.isDebugEnabled) log.debug(s"Error unregistering $name", e) + }) + registeredMBeans.clear() + } + + private def createMBeanName(group: String, `type`: String, name: String, scope: Option[String] = None): String = { + val nameBuilder: StringBuilder = new StringBuilder + nameBuilder.append(group) + nameBuilder.append(":type=") + nameBuilder.append(`type`) + if (scope.isDefined) { + nameBuilder.append(",scope=") + nameBuilder.append(scope.get) + } + if (name.length > 0) { + nameBuilder.append(",name=") + nameBuilder.append(name) + } + nameBuilder.toString + } +} + +private object JMXReporterActor { + + import MBeanManager._ + import MetricMBeans._ + + private[jmx] def props = Props(classOf[JMXReporterActor]) + + private def updateHystogramMetrics(group: String, name: String, hs: Histogram.Snapshot): Unit = { + createOrUpdateMBean[HistogramMetric, Histogram.Snapshot](group, name, hs) + } + + private def updateCounterMetrics(group: String, name: String, cs: Counter.Snapshot): Unit = { + createOrUpdateMBean[CounterMetric, Counter.Snapshot](group, name, cs) + } +} + +private class JMXReporterActor extends Actor { + + import JMXReporterActor._ + import MBeanManager.unregisterAllBeans + + def receive: Receive = { + case tick: TickMetricSnapshot ⇒ + for { + (entity, snapshot) ← tick.metrics + (metricKey, metricSnapshot) ← snapshot.metrics + } { + metricSnapshot match { + case hs: Histogram.Snapshot ⇒ + updateHystogramMetrics(entity.category, entity.name + "." + metricKey.name, hs) + case cs: Counter.Snapshot ⇒ + updateCounterMetrics(entity.category, entity.name + "." + metricKey.name, cs) + } + } + } + + override def postStop(): Unit = { + super.postStop() + unregisterAllBeans() + } +}
\ No newline at end of file diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf index a8ad9e75..97189008 100644 --- a/kamon-playground/src/main/resources/application.conf +++ b/kamon-playground/src/main/resources/application.conf @@ -42,5 +42,6 @@ kamon { kamon-datadog.auto-start = yes kamon-log-reporter.auto-start = no kamon-system-metrics.auto-start = no + kamon-jmx.auto-start = yes } } diff --git a/project/Projects.scala b/project/Projects.scala index a68677ef..612e832e 100644 --- a/project/Projects.scala +++ b/project/Projects.scala @@ -23,7 +23,8 @@ object Projects extends Build { lazy val kamon = Project("kamon", file(".")) .aggregate(kamonCore, kamonScala, kamonAkka, kamonSpray, kamonNewrelic, kamonPlayground, kamonTestkit, - kamonStatsD, kamonDatadog, kamonSPM, kamonSystemMetrics, kamonLogReporter, kamonAkkaRemote, kamonJdbc, kamonAnnotation, kamonPlay23, kamonPlay24) + kamonStatsD, kamonDatadog, kamonSPM, kamonSystemMetrics, kamonLogReporter, kamonAkkaRemote, kamonJdbc, + kamonAnnotation, kamonPlay23, kamonPlay24, kamonJMXReporter) .settings(basicSettings: _*) .settings(formatSettings: _*) .settings(noPublishing: _*) @@ -104,7 +105,8 @@ object Projects extends Build { lazy val kamonPlayground = Project("kamon-playground", file("kamon-playground")) - .dependsOn(kamonSpray, kamonNewrelic, kamonStatsD, kamonDatadog, kamonLogReporter, kamonSystemMetrics) + .dependsOn(kamonSpray, kamonNewrelic, kamonStatsD, kamonDatadog, kamonLogReporter, kamonSystemMetrics, + kamonJMXReporter) .settings(basicSettings: _*) .settings(formatSettings: _*) .settings(noPublishing: _*) @@ -215,5 +217,14 @@ object Projects extends Build { compile(sprayCan, sprayClient, sprayRouting, sprayJson, sprayJsonLenses, newrelic, akkaSlf4j) ++ test(scalatest, akkaTestKit, slf4Api, slf4nop)) + lazy val kamonJMXReporter = Project("kamon-jmx", file("kamon-jmx")) + .dependsOn(kamonCore) + .settings(basicSettings: _*) + .settings(formatSettings: _*) + .settings( + libraryDependencies ++= + compile(akkaActor) ++ + test(scalatest, akkaTestKit, slf4Api, slf4nop)) + val noPublishing = Seq(publish := (), publishLocal := (), publishArtifact := false) } |