aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego Parra <diegolparra@gmail.com>2015-10-23 19:34:28 -0300
committerDiego Parra <diegolparra@gmail.com>2015-10-23 19:34:28 -0300
commitf3eefdb687358401965fdd793c3a68507e27aa12 (patch)
treec4b04a10ca7a23232e05bca74cc0fa57eea62903
parentccb049c07fc28d25f389be72a242240fec1881a3 (diff)
parent98ff71de1d04c1bb65ca1d6220ddc5a075259248 (diff)
downloadKamon-f3eefdb687358401965fdd793c3a68507e27aa12.tar.gz
Kamon-f3eefdb687358401965fdd793c3a68507e27aa12.tar.bz2
Kamon-f3eefdb687358401965fdd793c3a68507e27aa12.zip
Merge pull request #258 from Tolsi/stolmachev-kamon-jmx
Kamon JMX was added
-rw-r--r--kamon-jmx/src/main/resources/reference.conf30
-rw-r--r--kamon-jmx/src/main/scala/kamon/jmx/JMXReporter.scala77
-rw-r--r--kamon-jmx/src/main/scala/kamon/jmx/JMXReporterActor.scala212
-rw-r--r--kamon-playground/src/main/resources/application.conf1
-rw-r--r--project/Projects.scala15
5 files changed, 333 insertions, 2 deletions
diff --git a/kamon-jmx/src/main/resources/reference.conf b/kamon-jmx/src/main/resources/reference.conf
new file mode 100644
index 00000000..41bf366b
--- /dev/null
+++ b/kamon-jmx/src/main/resources/reference.conf
@@ -0,0 +1,30 @@
+# ========================================== #
+# Kamon-JMX-Reporter Reference Configuration #
+# ========================================== #
+
+kamon {
+
+ jmx {
+ subscriptions {
+ histogram = [ "**" ]
+ min-max-counter = [ "**" ]
+ gauge = [ "**" ]
+ counter = [ "**" ]
+ trace = [ "**" ]
+ trace-segment = [ "**" ]
+ akka-actor = [ "**" ]
+ akka-dispatcher = [ "**" ]
+ akka-router = [ "**" ]
+ system-metric = [ "**" ]
+ http-server = [ "**" ]
+ }
+ }
+
+ modules {
+ kamon-jmx {
+ auto-start = yes
+ requires-aspectj = no
+ extension-id = "kamon.jmx.JMXReporter"
+ }
+ }
+}
diff --git a/kamon-jmx/src/main/scala/kamon/jmx/JMXReporter.scala b/kamon-jmx/src/main/scala/kamon/jmx/JMXReporter.scala
new file mode 100644
index 00000000..e1e12aea
--- /dev/null
+++ b/kamon-jmx/src/main/scala/kamon/jmx/JMXReporter.scala
@@ -0,0 +1,77 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+*/
+
+package kamon.jmx
+
+import javax.management.{JMException, JMRuntimeException}
+
+import akka.actor._
+import akka.event.Logging
+import kamon.Kamon
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.util.ConfigTools.Syntax
+import scala.collection.JavaConverters._
+
+object JMXReporter extends ExtensionId[JMXExtension] with ExtensionIdProvider {
+ override def lookup(): ExtensionId[_ <: Extension] = JMXReporter
+
+ override def createExtension(system: ExtendedActorSystem): JMXExtension = new JMXExtension(system)
+}
+
+class JMXExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ val log = Logging(system, getClass)
+ log.info("Starting the Kamon(JMX) extension")
+
+ val subscriber = system.actorOf(Props[JMXReporterSupervisor], "kamon-jmx-reporter")
+
+ val jmxConfig = system.settings.config.getConfig("kamon.jmx")
+ val subscriptions = jmxConfig.getConfig("subscriptions")
+
+ subscriptions.firstLevelKeys foreach { subscriptionCategory ⇒
+ subscriptions.getStringList(subscriptionCategory).asScala.foreach { pattern ⇒
+ Kamon.metrics.subscribe(subscriptionCategory, pattern, subscriber, permanently = true)
+ }
+ }
+}
+
+private trait ActorJMXSupervisor extends Actor with ActorLogging {
+
+ import akka.actor.OneForOneStrategy
+ import akka.actor.SupervisorStrategy._
+
+ import scala.concurrent.duration._
+
+ override val supervisorStrategy =
+ OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
+ case e: JMRuntimeException ⇒
+ log.error(e, "Supervisor strategy STOPPING actor from errors during JMX invocation")
+ Stop
+ case e: JMException ⇒
+ log.error(e, "Supervisor strategy STOPPING actor from incorrect invocation of JMX registration")
+ Stop
+ case t ⇒
+ // Use the default supervisor strategy otherwise.
+ super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate)
+ }
+}
+
+private class JMXReporterSupervisor extends Actor with ActorLogging with ActorJMXSupervisor {
+ private val jmxActor = context.actorOf(JMXReporterActor.props, "kamon-jmx-actor")
+
+ def receive = {
+ case tick: TickMetricSnapshot ⇒ jmxActor ! tick
+ }
+} \ No newline at end of file
diff --git a/kamon-jmx/src/main/scala/kamon/jmx/JMXReporterActor.scala b/kamon-jmx/src/main/scala/kamon/jmx/JMXReporterActor.scala
new file mode 100644
index 00000000..110943f9
--- /dev/null
+++ b/kamon-jmx/src/main/scala/kamon/jmx/JMXReporterActor.scala
@@ -0,0 +1,212 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+*/
+
+package kamon.jmx
+
+import java.lang.management.ManagementFactory
+import javax.management._
+
+import akka.actor.{Actor, Props}
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.metric.instrument.{Counter, Histogram, InstrumentSnapshot}
+import kamon.metric.{Entity, EntitySnapshot}
+import org.slf4j.LoggerFactory
+
+import scala.collection.concurrent.TrieMap
+
+private object MetricMBeans {
+
+ private implicit class RichHistogramSnapshot(histogram: Histogram.Snapshot) {
+ def average: Double = {
+ if (histogram.numberOfMeasurements == 0) 0D
+ else histogram.sum / histogram.numberOfMeasurements
+ }
+ }
+
+ private[jmx] sealed trait MetricMBean
+
+ private[jmx] abstract class AbstractMetricMBean[T <: InstrumentSnapshot] extends MetricMBean {
+ private[jmx] var snapshot: T
+
+ private[jmx] def objectName: ObjectName
+ }
+
+ private[jmx] trait HistogramMetricMBean extends AbstractMetricMBean[Histogram.Snapshot] {
+ def getNumberOfMeasurements: Long
+
+ def getMin: Long
+
+ def get50thPercentile: Long
+
+ def getPercentile(n: Double): Long
+
+ def get70thPercentile: Long
+
+ def get90thPercentile: Long
+
+ def get95thPercentile: Long
+
+ def get990thPercentile: Long
+
+ def get999thPercentile: Long
+
+ def getAvg: Double
+
+ def getMax: Long
+
+ def getSum: Long
+ }
+
+ private[jmx] class HistogramMetric(@volatile private[jmx] override var snapshot: Histogram.Snapshot, private[jmx] override val objectName: ObjectName) extends HistogramMetricMBean {
+ override def getNumberOfMeasurements: Long = snapshot.numberOfMeasurements
+
+ override def getMin: Long = snapshot.min
+
+ override def get50thPercentile: Long = snapshot.percentile(50.0D)
+
+ override def get70thPercentile: Long = snapshot.percentile(70.0D)
+
+ override def get90thPercentile: Long = snapshot.percentile(90.0D)
+
+ override def get95thPercentile: Long = snapshot.percentile(95.0D)
+
+ override def get990thPercentile: Long = snapshot.percentile(99.0D)
+
+ override def get999thPercentile: Long = snapshot.percentile(99.9D)
+
+ override def getPercentile(n: Double): Long = snapshot.percentile(n)
+
+ override def getMax: Long = snapshot.max
+
+ override def getSum: Long = snapshot.sum
+
+ override def getAvg: Double = snapshot.average
+ }
+
+ private[jmx] trait CounterMetricMBean extends AbstractMetricMBean[Counter.Snapshot] {
+ def getCount: Long
+ }
+
+ private[jmx] trait HistogramValueMetricMBean extends AbstractMetricMBean[Histogram.Snapshot] {
+ def getValue: Long
+ }
+
+ private[jmx] class CounterMetric(@volatile private[jmx] override var snapshot: Counter.Snapshot, private[jmx] override val objectName: ObjectName) extends CounterMetricMBean {
+ override def getCount: Long = snapshot.count
+ }
+
+ private[jmx] class HistogramValueMetric(@volatile private[jmx] override var snapshot: Histogram.Snapshot, private[jmx] override val objectName: ObjectName) extends HistogramValueMetricMBean {
+ override def getValue: Long = snapshot.max
+ }
+
+ private[jmx] implicit def impCreateHistogramMetric(snapshot: Histogram.Snapshot, objectName: ObjectName) = new HistogramMetric(snapshot, objectName)
+
+ private[jmx] implicit def impCounterMetric(snapshot: Counter.Snapshot, objectName: ObjectName) = new CounterMetric(snapshot, objectName)
+
+}
+
+private object MBeanManager {
+
+ import MetricMBeans._
+
+ private val mbs = ManagementFactory.getPlatformMBeanServer
+
+ private val registeredMBeans = TrieMap.empty[String, AbstractMetricMBean[_]]
+
+ private val log = LoggerFactory.getLogger(getClass)
+
+ private[jmx] def createOrUpdateMBean[M <: AbstractMetricMBean[T], T <: InstrumentSnapshot](group: String, name: String, snapshot: T)(implicit buildMetricMBean: (T, ObjectName) ⇒ M): Unit = {
+ registeredMBeans.get(name) match {
+ case Some(mbean: M) ⇒
+ mbean.snapshot = snapshot
+ case None ⇒
+ val objectName = new ObjectName(createMBeanName("kamon", group, name))
+ val mbean = buildMetricMBean(snapshot, objectName)
+ registeredMBeans += name -> mbean
+ mbs.registerMBean(mbean, objectName)
+ case _ ⇒ throw new IllegalStateException("Illegal metric bean type")
+ }
+ }
+
+ private[jmx] def unregisterAllBeans(): Unit = {
+ registeredMBeans.values.map(_.objectName).foreach(name ⇒
+ try {
+ mbs.unregisterMBean(name)
+ } catch {
+ case e: InstanceNotFoundException ⇒ if (log.isTraceEnabled) log.trace(s"Error unregistering $name", e)
+ case e: MBeanRegistrationException ⇒ if (log.isDebugEnabled) log.debug(s"Error unregistering $name", e)
+ })
+ registeredMBeans.clear()
+ }
+
+ private def createMBeanName(group: String, `type`: String, name: String, scope: Option[String] = None): String = {
+ val nameBuilder: StringBuilder = new StringBuilder
+ nameBuilder.append(group)
+ nameBuilder.append(":type=")
+ nameBuilder.append(`type`)
+ if (scope.isDefined) {
+ nameBuilder.append(",scope=")
+ nameBuilder.append(scope.get)
+ }
+ if (name.length > 0) {
+ nameBuilder.append(",name=")
+ nameBuilder.append(name)
+ }
+ nameBuilder.toString
+ }
+}
+
+private object JMXReporterActor {
+
+ import MBeanManager._
+ import MetricMBeans._
+
+ private[jmx] def props = Props(classOf[JMXReporterActor])
+
+ private def updateHystogramMetrics(group: String, name: String, hs: Histogram.Snapshot): Unit = {
+ createOrUpdateMBean[HistogramMetric, Histogram.Snapshot](group, name, hs)
+ }
+
+ private def updateCounterMetrics(group: String, name: String, cs: Counter.Snapshot): Unit = {
+ createOrUpdateMBean[CounterMetric, Counter.Snapshot](group, name, cs)
+ }
+}
+
+private class JMXReporterActor extends Actor {
+
+ import JMXReporterActor._
+ import MBeanManager.unregisterAllBeans
+
+ def receive: Receive = {
+ case tick: TickMetricSnapshot ⇒
+ for {
+ (entity, snapshot) ← tick.metrics
+ (metricKey, metricSnapshot) ← snapshot.metrics
+ } {
+ metricSnapshot match {
+ case hs: Histogram.Snapshot ⇒
+ updateHystogramMetrics(entity.category, entity.name + "." + metricKey.name, hs)
+ case cs: Counter.Snapshot ⇒
+ updateCounterMetrics(entity.category, entity.name + "." + metricKey.name, cs)
+ }
+ }
+ }
+
+ override def postStop(): Unit = {
+ super.postStop()
+ unregisterAllBeans()
+ }
+} \ No newline at end of file
diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf
index a8ad9e75..97189008 100644
--- a/kamon-playground/src/main/resources/application.conf
+++ b/kamon-playground/src/main/resources/application.conf
@@ -42,5 +42,6 @@ kamon {
kamon-datadog.auto-start = yes
kamon-log-reporter.auto-start = no
kamon-system-metrics.auto-start = no
+ kamon-jmx.auto-start = yes
}
}
diff --git a/project/Projects.scala b/project/Projects.scala
index a68677ef..612e832e 100644
--- a/project/Projects.scala
+++ b/project/Projects.scala
@@ -23,7 +23,8 @@ object Projects extends Build {
lazy val kamon = Project("kamon", file("."))
.aggregate(kamonCore, kamonScala, kamonAkka, kamonSpray, kamonNewrelic, kamonPlayground, kamonTestkit,
- kamonStatsD, kamonDatadog, kamonSPM, kamonSystemMetrics, kamonLogReporter, kamonAkkaRemote, kamonJdbc, kamonAnnotation, kamonPlay23, kamonPlay24)
+ kamonStatsD, kamonDatadog, kamonSPM, kamonSystemMetrics, kamonLogReporter, kamonAkkaRemote, kamonJdbc,
+ kamonAnnotation, kamonPlay23, kamonPlay24, kamonJMXReporter)
.settings(basicSettings: _*)
.settings(formatSettings: _*)
.settings(noPublishing: _*)
@@ -104,7 +105,8 @@ object Projects extends Build {
lazy val kamonPlayground = Project("kamon-playground", file("kamon-playground"))
- .dependsOn(kamonSpray, kamonNewrelic, kamonStatsD, kamonDatadog, kamonLogReporter, kamonSystemMetrics)
+ .dependsOn(kamonSpray, kamonNewrelic, kamonStatsD, kamonDatadog, kamonLogReporter, kamonSystemMetrics,
+ kamonJMXReporter)
.settings(basicSettings: _*)
.settings(formatSettings: _*)
.settings(noPublishing: _*)
@@ -215,5 +217,14 @@ object Projects extends Build {
compile(sprayCan, sprayClient, sprayRouting, sprayJson, sprayJsonLenses, newrelic, akkaSlf4j) ++
test(scalatest, akkaTestKit, slf4Api, slf4nop))
+ lazy val kamonJMXReporter = Project("kamon-jmx", file("kamon-jmx"))
+ .dependsOn(kamonCore)
+ .settings(basicSettings: _*)
+ .settings(formatSettings: _*)
+ .settings(
+ libraryDependencies ++=
+ compile(akkaActor) ++
+ test(scalatest, akkaTestKit, slf4Api, slf4nop))
+
val noPublishing = Seq(publish := (), publishLocal := (), publishArtifact := false)
}