diff options
12 files changed, 178 insertions, 54 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index 199b2bb2..6762fb10 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -27,7 +27,7 @@ import kamon.metrics.ActorMetrics.ActorMetricRecorder @Aspect("perthis(actorCellCreation(*, *, *, *, *))") class BehaviourInvokeTracing { - var path: String = _ + var metricIdentity: ActorMetrics = _ var actorMetrics: Option[ActorMetricRecorder] = None @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") @@ -37,8 +37,8 @@ class BehaviourInvokeTracing { def afterCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { val metricsExtension = Kamon(Metrics)(system) - path = ref.path.elements.mkString("/") - actorMetrics = metricsExtension.register(path, ActorMetrics) + metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) + actorMetrics = metricsExtension.register(metricIdentity, ActorMetrics.Factory) } @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") @@ -65,7 +65,7 @@ class BehaviourInvokeTracing { @After("actorStop(cell)") def afterStop(cell: Cell): Unit = { - actorMetrics.map(p ⇒ Kamon(Metrics)(cell.system).unregister(path, ActorMetrics)) + actorMetrics.map(p ⇒ Kamon(Metrics)(cell.system).unregister(metricIdentity)) } } diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala index a00b443e..4c8752e5 100644 --- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala @@ -20,9 +20,12 @@ import com.typesafe.config.Config import kamon.metrics.instruments.ContinuousHighDynamicRangeRecorder import org.HdrHistogram.HighDynamicRangeRecorder -object ActorMetrics extends MetricGroupIdentity.Category with MetricGroupFactory { - type GroupRecorder = ActorMetricRecorder - val entityName = "actor" +case class ActorMetrics(name: String) extends MetricGroupIdentity { + val category = ActorMetrics +} + +object ActorMetrics extends MetricGroupCategory { + val name = "actor" case object ProcessingTime extends MetricIdentity { val name, tag = "ProcessingTime" } case object MailboxSize extends MetricIdentity { val name, tag = "MailboxSize" } @@ -51,17 +54,21 @@ object ActorMetrics extends MetricGroupIdentity.Category with MetricGroupFactory (TimeInMailbox -> timeInMailbox)) } - def create(config: Config): ActorMetricRecorder = { - import HighDynamicRangeRecorder.Configuration + val Factory = new MetricGroupFactory { + type GroupRecorder = ActorMetricRecorder - val settings = config.getConfig("kamon.metrics.precision.actor") - val processingTimeHdrConfig = Configuration.fromConfig(settings.getConfig("processing-time")) - val mailboxSizeHdrConfig = Configuration.fromConfig(settings.getConfig("mailbox-size")) - val timeInMailboxHdrConfig = Configuration.fromConfig(settings.getConfig("time-in-mailbox")) + def create(config: Config): ActorMetricRecorder = { + import HighDynamicRangeRecorder.Configuration - new ActorMetricRecorder( - HighDynamicRangeRecorder(processingTimeHdrConfig), - ContinuousHighDynamicRangeRecorder(mailboxSizeHdrConfig), - HighDynamicRangeRecorder(timeInMailboxHdrConfig)) + val settings = config.getConfig("kamon.metrics.precision.actor") + val processingTimeHdrConfig = Configuration.fromConfig(settings.getConfig("processing-time")) + val mailboxSizeHdrConfig = Configuration.fromConfig(settings.getConfig("mailbox-size")) + val timeInMailboxHdrConfig = Configuration.fromConfig(settings.getConfig("time-in-mailbox")) + + new ActorMetricRecorder( + HighDynamicRangeRecorder(processingTimeHdrConfig), + ContinuousHighDynamicRangeRecorder(mailboxSizeHdrConfig), + HighDynamicRangeRecorder(timeInMailboxHdrConfig)) + } } } diff --git a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala index 2a9c5c65..ee32dbe8 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala @@ -20,7 +20,14 @@ import annotation.tailrec import com.typesafe.config.Config import kamon.metrics.MetricSnapshot.Measurement -case class MetricGroupIdentity(name: String, category: MetricGroupIdentity.Category) +trait MetricGroupCategory { + def name: String +} + +trait MetricGroupIdentity { + def name: String + def category: MetricGroupCategory +} trait MetricIdentity { def name: String @@ -96,14 +103,7 @@ object MetricSnapshot { case class DefaultMetricSnapshot(numberOfMeasurements: Long, measurementLevels: Vector[MetricSnapshot.Measurement]) extends MetricSnapshot object MetricGroupIdentity { - trait Category { - def entityName: String - } - val AnyCategory = new Category { - val entityName: String = "match-all" - override def equals(that: Any): Boolean = that.isInstanceOf[Category] - } } trait MetricGroupFactory { diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala index a717e25a..a8b90b57 100644 --- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala @@ -22,9 +22,7 @@ import com.typesafe.config.Config import kamon.util.GlobPathFilter import kamon.Kamon import akka.actor -import kamon.metrics.MetricGroupIdentity.Category import kamon.metrics.Metrics.MetricGroupFilter -import scala.Some import kamon.metrics.Subscriptions.Subscribe class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension { @@ -33,18 +31,18 @@ class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension val filters = loadFilters(config) lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") - def register(name: String, category: MetricGroupIdentity.Category with MetricGroupFactory): Option[category.GroupRecorder] = { - if (shouldTrack(name, category)) - Some(storage.getOrElseUpdate(MetricGroupIdentity(name, category), category.create(config)).asInstanceOf[category.GroupRecorder]) + def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { + if (shouldTrack(identity)) + Some(storage.getOrElseUpdate(identity, factory.create(config)).asInstanceOf[factory.GroupRecorder]) else None } - def unregister(name: String, category: MetricGroupIdentity.Category with MetricGroupFactory): Unit = { - storage.remove(MetricGroupIdentity(name, category)) + def unregister(identity: MetricGroupIdentity): Unit = { + storage.remove(identity) } - def subscribe(category: Category, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = { + def subscribe[C <: MetricGroupCategory](category: C, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = { subscriptions.tell(Subscribe(category, selection, permanently), receiver) } @@ -52,8 +50,8 @@ class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension (for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap } - private def shouldTrack(name: String, category: MetricGroupIdentity.Category): Boolean = { - filters.get(category.entityName).map(filter ⇒ filter.accept(name)).getOrElse(false) + private def shouldTrack(identity: MetricGroupIdentity): Boolean = { + filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(false) } def loadFilters(config: Config): Map[String, MetricGroupFilter] = { diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala index 3151bdc1..654c37b0 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala @@ -21,7 +21,6 @@ import kamon.metrics.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetric import kamon.util.GlobPathFilter import scala.concurrent.duration.Duration import java.util.concurrent.TimeUnit -import kamon.metrics.MetricGroupIdentity.Category import kamon.Kamon class Subscriptions extends Actor { @@ -40,7 +39,7 @@ class Subscriptions extends Actor { case FlushMetrics ⇒ flush() } - def subscribe(category: Category, selection: String, permanent: Boolean): Unit = { + def subscribe(category: MetricGroupCategory, selection: String, permanent: Boolean): Unit = { val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) if (permanent) { val receivers = subscribedPermanently.get(filter).getOrElse(Nil) @@ -78,10 +77,10 @@ class Subscriptions extends Actor { object Subscriptions { case object FlushMetrics - case class Subscribe(category: Category, selection: String, permanently: Boolean = false) + case class Subscribe(category: MetricGroupCategory, selection: String, permanently: Boolean = false) case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) - case class MetricGroupFilter(category: Category, globFilter: GlobPathFilter) { + case class MetricGroupFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) { def accept(identity: MetricGroupIdentity): Boolean = { category.equals(identity.category) && globFilter.accept(identity.name) } diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala index 66801481..ccffe382 100644 --- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala @@ -20,12 +20,14 @@ import org.HdrHistogram.HighDynamicRangeRecorder import scala.collection.concurrent.TrieMap import com.typesafe.config.Config -object TraceMetrics extends MetricGroupIdentity.Category with MetricGroupFactory { - type GroupRecorder = TraceMetricRecorder - val entityName = "trace" +case class TraceMetrics(name: String) extends MetricGroupIdentity { + val category = TraceMetrics +} - case object ElapsedTime extends MetricIdentity { val name, tag = "ElapsedTime" } +object TraceMetrics extends MetricGroupCategory { + val name = "trace" + case object ElapsedTime extends MetricIdentity { val name, tag = "ElapsedTime" } case class HttpClientRequest(name: String, tag: String) extends MetricIdentity class TraceMetricRecorder(val elapsedTime: HighDynamicRangeRecorder, private val segmentRecorderFactory: () ⇒ HighDynamicRangeRecorder) @@ -48,14 +50,18 @@ object TraceMetrics extends MetricGroupIdentity.Category with MetricGroupFactory def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) } - def create(config: Config): TraceMetricRecorder = { - import HighDynamicRangeRecorder.Configuration + val Factory = new MetricGroupFactory { + type GroupRecorder = TraceMetricRecorder - val settings = config.getConfig("kamon.metrics.precision.trace") - val elapsedTimeHdrConfig = Configuration.fromConfig(settings.getConfig("elapsed-time")) - val segmentHdrConfig = Configuration.fromConfig(settings.getConfig("segment")) + def create(config: Config): TraceMetricRecorder = { + import HighDynamicRangeRecorder.Configuration - new TraceMetricRecorder(HighDynamicRangeRecorder(elapsedTimeHdrConfig), () ⇒ HighDynamicRangeRecorder(segmentHdrConfig)) + val settings = config.getConfig("kamon.metrics.precision.trace") + val elapsedTimeHdrConfig = Configuration.fromConfig(settings.getConfig("elapsed-time")) + val segmentHdrConfig = Configuration.fromConfig(settings.getConfig("segment")) + + new TraceMetricRecorder(HighDynamicRangeRecorder(elapsedTimeHdrConfig), () ⇒ HighDynamicRangeRecorder(segmentHdrConfig)) + } } } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index d3759a26..dd4a25f8 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -89,7 +89,7 @@ class SimpleMetricCollectionContext(@volatile private var _name: String, val tok def finish(metadata: Map[String, String]): Unit = { _isOpen = false val finishMark = System.nanoTime() - val metricRecorder = metricsExtension.register(name, TraceMetrics) + val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) metricRecorder.map { traceMetrics ⇒ traceMetrics.elapsedTime.record(finishMark - startMark) @@ -108,7 +108,7 @@ class SimpleMetricCollectionContext(@volatile private var _name: String, val tok finishedSegments.add(SegmentData(identity, duration, metadata)) if (!_isOpen) { - metricsExtension.register(name, TraceMetrics).map { traceMetrics ⇒ + metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ drainFinishedSegments(traceMetrics) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala new file mode 100644 index 00000000..dc6f0868 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala @@ -0,0 +1,32 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.newrelic + +case class Metric(name: String, scope: Option[String], callCount: Long, total: Double, totalExclusive: Double, + min: Double, max: Double, sumOfSquares: Double) { + + def merge(that: Metric): Metric = { + Metric(name, scope, + callCount + that.callCount, + total + that.total, + totalExclusive + that.totalExclusive, + math.min(min, that.min), + math.max(max, that.max), + sumOfSquares + that.sumOfSquares) + } + +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala index 5f736a90..57be566c 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -35,7 +35,7 @@ class NewRelicManager extends Actor with ActorLogging { //Kamon(Trace)(context.system).api ! Trace.Register - val webTransactionMetrics = context.actorOf(Props[WebTransactionMetrics], "web-transaction-metrics") + val webTransactionMetrics = context.actorOf(Props[WebTransactionMetrics2], "web-transaction-metrics") val agent = context.actorOf(Props[Agent], "agent") import context.dispatcher @@ -70,7 +70,7 @@ object NewRelicMetric { case class MetricBatch(metrics: List[(ID, Data)]) } -class WebTransactionMetrics extends Actor with ActorLogging { +class WebTransactionMetrics2 extends Actor with ActorLogging { val apdexT = 0.5D var metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data] var apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0) diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala new file mode 100644 index 00000000..e10e9271 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala @@ -0,0 +1,34 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.newrelic + +import kamon.metrics.{ TraceMetrics, MetricGroupSnapshot, MetricGroupIdentity } + +object WebTransactionMetrics { + def collectWebTransactionMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): List[Metric] = { + metrics.collect { + case (TraceMetrics(name), groupSnapshot) ⇒ + + + groupSnapshot.metrics foreach { + case (metricIdentity, snapshot) => println(s"[$name] - ${toNewRelicMetric(name, None, snapshot)}") + } + } + + Nil + } +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala new file mode 100644 index 00000000..f0b28d95 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala @@ -0,0 +1,45 @@ +/* + * + * * ========================================================================================= + * * Copyright © 2013 the kamon project <http://kamon.io/> + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * * except in compliance with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software distributed under the + * * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * * either express or implied. See the License for the specific language governing permissions + * * and limitations under the License. + * * ========================================================================================= + * + */ + +package kamon + +import kamon.metrics.MetricSnapshot + +package object newrelic { + + def toNewRelicMetric(name: String, scope: Option[String], snapshot: MetricSnapshot): Metric = { + var total: Double = 0D + var sumOfSquares: Double = 0D + + val measurementLevels = snapshot.measurementLevels.iterator + while(measurementLevels.hasNext) { + val level = measurementLevels.next() + + // NewRelic metrics need to be scaled to seconds. + val scaledValue = level.value / 1E9D + + total += scaledValue + sumOfSquares += scaledValue * scaledValue + } + + val scaledMin = snapshot.min / 1E9D + val scaledMax = snapshot.max / 1E9D + + Metric(name, scope, snapshot.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares) + } +} diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index bd85ea7d..cd497ca5 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -27,6 +27,8 @@ import kamon.trace.TraceRecorder import kamon.Kamon import kamon.metrics.{ ActorMetrics, TraceMetrics, Metrics } import spray.http.{ StatusCodes, Uri } +import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.newrelic.WebTransactionMetrics object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives { import scala.concurrent.duration._ @@ -42,7 +44,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil def receive: Actor.Receive = { case any ⇒ sender ! any } }), "com") - //Kamon(Metrics).subscribe(TraceMetrics, "*", printer, permanently = true) + Kamon(Metrics).subscribe(TraceMetrics, "*", printer, permanently = true) //Kamon(Metrics).subscribe(ActorMetrics, "*", printer, permanently = true) implicit val timeout = Timeout(30 seconds) @@ -107,6 +109,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil class PrintWhatever extends Actor { def receive = { + case tick: TickMetricSnapshot => WebTransactionMetrics.collectWebTransactionMetrics(tick.metrics) case anything ⇒ println(anything) } } |