From 01a34f67ff75419c440f2e69c0a0db888a670a34 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 12 Jan 2015 01:45:27 +0100 Subject: ! all: improve the metric recorders infrastructure --- .../kamon/newrelic/CustomMetricExtractor.scala | 18 +++--- .../main/scala/kamon/newrelic/JsonProtocol.scala | 2 +- .../src/main/scala/kamon/newrelic/Metric.scala | 25 +++++--- .../main/scala/kamon/newrelic/MetricReporter.scala | 18 ++---- .../scala/kamon/newrelic/NewRelicErrorLogger.scala | 6 +- .../newrelic/WebTransactionMetricExtractor.scala | 71 +++++++++++----------- .../src/test/scala/kamon/newrelic/AgentSpec.scala | 2 +- .../scala/kamon/newrelic/MetricReporterSpec.scala | 24 ++++---- 8 files changed, 86 insertions(+), 80 deletions(-) (limited to 'kamon-newrelic') diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala index e97c24dc..551bb546 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala @@ -16,17 +16,17 @@ package kamon.newrelic -import kamon.metric.UserMetrics.UserMetricGroup -import kamon.metric._ +import kamon.metric.{ UserMetrics, EntitySnapshot, Entity } +import kamon.metric.instrument.CollectionContext object CustomMetricExtractor extends MetricExtractor { - def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { - metrics.collect { - case (mg: UserMetricGroup, groupSnapshot) ⇒ - groupSnapshot.metrics collect { - case (name, snapshot) ⇒ Metric.fromKamonMetricSnapshot(snapshot, s"Custom/${mg.name}", None, Scale.Unit) - } - }.flatten.toMap + def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData] = { + metrics.get(UserMetrics.entity).map { allUserMetrics ⇒ + allUserMetrics.metrics.map { + case (key, snapshot) ⇒ Metric(snapshot, key.unitOfMeasurement, s"Custom/${key.name}", None) + } + + } getOrElse (Map.empty) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala index 0e53be0b..6e16b975 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala @@ -15,7 +15,7 @@ * ========================================================== */ package kamon.newrelic -import kamon.Timestamp +import kamon.util.Timestamp import spray.json._ object JsonProtocol extends DefaultJsonProtocol { diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala index 52d21f31..20204b79 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala @@ -1,8 +1,8 @@ package kamon.newrelic -import kamon.Timestamp -import kamon.metric.instrument.{ Counter, Histogram } -import kamon.metric.{ MetricSnapshot, Scale } +import kamon.metric.instrument._ +import kamon.metric.MetricKey +import kamon.util.{ MapMerge, Timestamp } case class MetricID(name: String, scope: Option[String]) case class MetricData(callCount: Long, total: Double, totalExclusive: Double, min: Double, max: Double, sumOfSquares: Double) { @@ -18,16 +18,23 @@ case class MetricData(callCount: Long, total: Double, totalExclusive: Double, mi object Metric { - def fromKamonMetricSnapshot(snapshot: MetricSnapshot, name: String, scope: Option[String], targetScale: Scale): Metric = { + def scaleFunction(uom: UnitOfMeasurement): Long ⇒ Double = uom match { + case time: Time ⇒ time.scale(Time.Seconds) + case other ⇒ _.toDouble + } + + def apply(snapshot: InstrumentSnapshot, snapshotUnit: UnitOfMeasurement, name: String, scope: Option[String]): Metric = { snapshot match { case hs: Histogram.Snapshot ⇒ var total: Double = 0D var sumOfSquares: Double = 0D - val scaledMin = Scale.convert(hs.scale, targetScale, hs.min) - val scaledMax = Scale.convert(hs.scale, targetScale, hs.max) + val scaler = scaleFunction(snapshotUnit) + + val scaledMin = scaler(hs.min) + val scaledMax = scaler(hs.max) hs.recordsIterator.foreach { record ⇒ - val scaledValue = Scale.convert(hs.scale, targetScale, record.level) + val scaledValue = scaler(record.level) total += scaledValue * record.count sumOfSquares += (scaledValue * scaledValue) * record.count @@ -42,12 +49,12 @@ object Metric { } case class TimeSliceMetrics(from: Timestamp, to: Timestamp, metrics: Map[MetricID, MetricData]) { - import kamon.metric.combineMaps + import MapMerge.Syntax def merge(that: TimeSliceMetrics): TimeSliceMetrics = { val mergedFrom = Timestamp.earlier(from, that.from) val mergedTo = Timestamp.later(to, that.to) - val mergedMetrics = combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r)) + val mergedMetrics = metrics.merge(that.metrics, (l, r) ⇒ l.merge(r)) TimeSliceMetrics(mergedFrom, mergedTo, mergedMetrics) } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala index 286b0a77..51c1ad21 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala @@ -6,9 +6,9 @@ import akka.actor.{ Props, ActorLogging, Actor } import akka.pattern.pipe import akka.io.IO import kamon.Kamon -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms } +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import kamon.metric._ +import kamon.metric.instrument.CollectionContext import kamon.newrelic.ApiMethodClient.{ AgentShutdownRequiredException, AgentRestartRequiredException } import kamon.newrelic.MetricReporter.{ PostFailed, PostSucceeded } import spray.can.Http @@ -22,7 +22,7 @@ class MetricReporter(settings: AgentSettings) extends Actor with ActorLogging wi val metricsExtension = Kamon(Metrics)(context.system) val collectionContext = metricsExtension.buildDefaultCollectionContext val metricsSubscriber = { - val tickInterval = context.system.settings.config.getDuration("kamon.metrics.tick-interval", TimeUnit.MILLISECONDS) + val tickInterval = context.system.settings.config.getDuration("kamon.metric.tick-interval", TimeUnit.MILLISECONDS) // Metrics are always sent to New Relic in 60 seconds intervals. if (tickInterval == 60000) self @@ -91,14 +91,8 @@ class MetricReporter(settings: AgentSettings) extends Actor with ActorLogging wi } def subscribeToMetrics(): Unit = { - // Subscribe to Trace Metrics - metricsExtension.subscribe(TraceMetrics, "*", metricsSubscriber, permanently = true) - - // Subscribe to all User Metrics - metricsExtension.subscribe(UserHistograms, "*", metricsSubscriber, permanently = true) - metricsExtension.subscribe(UserCounters, "*", metricsSubscriber, permanently = true) - metricsExtension.subscribe(UserMinMaxCounters, "*", metricsSubscriber, permanently = true) - metricsExtension.subscribe(UserGauges, "*", metricsSubscriber, permanently = true) + metricsExtension.subscribe("trace", "*", metricsSubscriber, permanently = true) + metricsExtension.subscribe("user-metrics", "*", metricsSubscriber, permanently = true) } } @@ -113,5 +107,5 @@ object MetricReporter { } trait MetricExtractor { - def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] + def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData] } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 56b29aff..7f56d931 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -21,8 +21,8 @@ import java.util import akka.actor.{ Actor, ActorLogging } import akka.event.Logging.{ Error, InitializeLogger, LoggerInitialized } import com.newrelic.api.agent.{ NewRelic ⇒ NR } -import kamon.trace.TraceLocal.{ HttpContext, HttpContextKey } -import kamon.trace.{ TraceLocal, TraceRecorder, TraceContextAware } +import kamon.trace.TraceLocal.HttpContextKey +import kamon.trace.{ TraceContext, TraceLocal, TraceContextAware } trait CustomParamsSupport { this: NewRelicErrorLogger ⇒ @@ -64,7 +64,7 @@ class NewRelicErrorLogger extends Actor with ActorLogging with CustomParamsSuppo //Really ugly, but temporal hack until next release... def runInFakeTransaction[T](thunk: ⇒ T): T = { val oldName = Thread.currentThread.getName - Thread.currentThread.setName(TraceRecorder.currentContext.name) + Thread.currentThread.setName(TraceContext.currentContext.name) try thunk finally Thread.currentThread.setName(oldName) } } \ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala index baf20434..d0144f4b 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala @@ -16,78 +16,81 @@ package kamon.newrelic +import kamon.metric.{ EntitySnapshot, Entity } + import scala.collection.mutable -import kamon.metric._ -import kamon.metric.TraceMetrics.TraceMetricsSnapshot -import kamon.metric.instrument.Histogram -import kamon.trace.SegmentCategory.HttpClient -import kamon.trace.SegmentMetricIdentity +import kamon.metric.instrument.{ Time, CollectionContext, Histogram } object WebTransactionMetricExtractor extends MetricExtractor { - def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { + def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData] = { val apdexBuilder = new ApdexBuilder("Apdex", None, settings.apdexT) // Trace metrics are recorded in nanoseconds. - var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) - var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) + var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty + var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty val externalByHostSnapshots = mutable.Map.empty[String, List[Histogram.Snapshot]] val externalByHostAndLibrarySnapshots = mutable.Map.empty[(String, String), List[Histogram.Snapshot]] val externalScopedByHostAndLibrarySnapshots = mutable.Map.empty[(String, String, String), List[Histogram.Snapshot]] - val transactionMetrics = metrics.collect { - case (TraceMetrics(traceName), tms: TraceMetricsSnapshot) ⇒ - - tms.segments.foreach { - case (SegmentMetricIdentity(segmentName, category, library), snapshot: Histogram.Snapshot) if category.equals(HttpClient) ⇒ - accumulatedExternalServices = accumulatedExternalServices.merge(snapshot, collectionContext) + val transactionMetrics = metrics.filterKeys(_.category == "trace").map { + case (entity: Entity, es: EntitySnapshot) ⇒ + // Trace metrics only have elapsed-time and segments and all of them are Histograms. + es.histograms.foreach { + case (key, segmentSnapshot) if key.metadata.get("category").filter(_ == "http-client").nonEmpty ⇒ + val library = key.metadata("library") + accumulatedExternalServices = accumulatedExternalServices.merge(segmentSnapshot, collectionContext) // Accumulate externals by host - externalByHostSnapshots.update(segmentName, snapshot :: externalByHostSnapshots.getOrElse(segmentName, Nil)) + externalByHostSnapshots.update(key.name, segmentSnapshot :: externalByHostSnapshots.getOrElse(key.name, Nil)) // Accumulate externals by host and library - externalByHostAndLibrarySnapshots.update((segmentName, library), - snapshot :: externalByHostAndLibrarySnapshots.getOrElse((segmentName, library), Nil)) + externalByHostAndLibrarySnapshots.update((key.name, library), + segmentSnapshot :: externalByHostAndLibrarySnapshots.getOrElse((key.name, library), Nil)) // Accumulate externals by host and library, including the transaction as scope. - externalScopedByHostAndLibrarySnapshots.update((segmentName, library, traceName), - snapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((segmentName, library, traceName), Nil)) + externalScopedByHostAndLibrarySnapshots.update((key.name, library, entity.name), + segmentSnapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((key.name, library, entity.name), Nil)) - case otherSegments ⇒ // Ignore other kinds of segments. - } + case otherSegments ⇒ - accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(tms.elapsedTime, collectionContext) - tms.elapsedTime.recordsIterator.foreach { record ⇒ - apdexBuilder.record(Scale.convert(tms.elapsedTime.scale, Scale.Unit, record.level), record.count) } - Metric.fromKamonMetricSnapshot(tms.elapsedTime, "WebTransaction/Custom/" + traceName, None, Scale.Unit) - } + es.histograms.collect { + case (key, elapsedTime) if key.name == "elapsed-time" ⇒ + accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(elapsedTime, collectionContext) + elapsedTime.recordsIterator.foreach { record ⇒ + apdexBuilder.record(Time.Nanoseconds.scale(Time.Seconds)(record.level), record.count) + } + + Metric(elapsedTime, key.unitOfMeasurement, "WebTransaction/Custom/" + entity.name, None) + } + } flatten - val httpDispatcher = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "HttpDispatcher", None, Scale.Unit) + val httpDispatcher = Metric(accumulatedHttpDispatcher, Time.Seconds, "HttpDispatcher", None) val webTransaction = httpDispatcher.copy(MetricID("WebTransaction", None)) val webTransactionTotal = httpDispatcher.copy(MetricID("WebTransactionTotalTime", None)) - val externalAllWeb = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External/allWeb", None, Scale.Unit) + val externalAllWeb = Metric(accumulatedExternalServices, Time.Seconds, "External/allWeb", None) val externalAll = externalAllWeb.copy(MetricID("External/all", None)) val externalByHost = externalByHostSnapshots.map { case (host, snapshots) ⇒ - val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) - Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/all", None, Scale.Unit) + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext)) + Metric(mergedSnapshots, Time.Seconds, s"External/$host/all", None) } val externalByHostAndLibrary = externalByHostAndLibrarySnapshots.map { case ((host, library), snapshots) ⇒ - val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) - Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", None, Scale.Unit) + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext)) + Metric(mergedSnapshots, Time.Seconds, s"External/$host/$library", None) } val externalScopedByHostAndLibrary = externalScopedByHostAndLibrarySnapshots.map { case ((host, library, traceName), snapshots) ⇒ - val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) - Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName), Scale.Unit) + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext)) + Metric(mergedSnapshots, Time.Seconds, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName)) } Map(httpDispatcher, webTransaction, webTransactionTotal, externalAllWeb, externalAll, apdexBuilder.build) ++ diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala index adab1a34..05d3533b 100644 --- a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala @@ -22,7 +22,6 @@ import akka.actor.{ ActorRef, ActorSystem, Props } import akka.io.IO import akka.testkit._ import com.typesafe.config.ConfigFactory -import kamon.AkkaExtensionSwap import org.scalatest.{ BeforeAndAfterAll, WordSpecLike } import spray.can.Http import spray.http._ @@ -30,6 +29,7 @@ import spray.httpx.encoding.Deflate import spray.httpx.{ SprayJsonSupport, RequestBuilding } import spray.json.JsArray import spray.json._ +import testkit.AkkaExtensionSwap class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll with RequestBuilding with SprayJsonSupport { import JsonProtocol._ diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala index ff977398..13ccbae3 100644 --- a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala @@ -21,15 +21,17 @@ import akka.io.IO import akka.testkit._ import akka.util.Timeout import com.typesafe.config.ConfigFactory -import kamon.metric.{ TraceMetrics, Metrics } -import kamon.{ MilliTimestamp, Kamon, AkkaExtensionSwap } -import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.{ Entity, Metrics, TraceMetrics } +import kamon.util.MilliTimestamp +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import org.scalatest.{ Matchers, WordSpecLike } import spray.can.Http import spray.http.Uri.Query import spray.http._ import spray.httpx.encoding.Deflate import spray.httpx.{ RequestBuilding, SprayJsonSupport } +import testkit.AkkaExtensionSwap import scala.concurrent.duration._ import spray.json._ @@ -133,20 +135,20 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers wit } trait FakeTickSnapshotsFixture { - val testTraceID = TraceMetrics("example-trace") - val recorder = Kamon(Metrics).register(testTraceID, TraceMetrics.Factory).get + val testTraceID = Entity("example-trace", "trace") + val recorder = Kamon(Metrics).register(TraceMetrics, testTraceID.name).get.recorder val collectionContext = Kamon(Metrics).buildDefaultCollectionContext def collectRecorder = recorder.collect(collectionContext) - recorder.elapsedTime.record(1000000) - recorder.elapsedTime.record(2000000) - recorder.elapsedTime.record(3000000) + recorder.ElapsedTime.record(1000000) + recorder.ElapsedTime.record(2000000) + recorder.ElapsedTime.record(3000000) val firstSnapshot = TickMetricSnapshot(new MilliTimestamp(1415587618000L), new MilliTimestamp(1415587678000L), Map(testTraceID -> collectRecorder)) - recorder.elapsedTime.record(6000000) - recorder.elapsedTime.record(5000000) - recorder.elapsedTime.record(4000000) + recorder.ElapsedTime.record(6000000) + recorder.ElapsedTime.record(5000000) + recorder.ElapsedTime.record(4000000) val secondSnapshot = TickMetricSnapshot(new MilliTimestamp(1415587678000L), new MilliTimestamp(1415587738000L), Map(testTraceID -> collectRecorder)) } } \ No newline at end of file -- cgit v1.2.3