diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-12 01:45:27 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-24 23:19:01 +0100 |
commit | 01a34f67ff75419c440f2e69c0a0db888a670a34 (patch) | |
tree | 9c4dee4e9c13c26937356950f9e4927c3f9dfb7d /kamon-newrelic/src/main/scala | |
parent | 4a47e92d23af371f1d50b40af6cbe00a5ffc0105 (diff) | |
download | Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.gz Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.bz2 Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.zip |
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-newrelic/src/main/scala')
6 files changed, 72 insertions, 68 deletions
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala index e97c24dc..551bb546 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/CustomMetricExtractor.scala @@ -16,17 +16,17 @@ package kamon.newrelic -import kamon.metric.UserMetrics.UserMetricGroup -import kamon.metric._ +import kamon.metric.{ UserMetrics, EntitySnapshot, Entity } +import kamon.metric.instrument.CollectionContext object CustomMetricExtractor extends MetricExtractor { - def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { - metrics.collect { - case (mg: UserMetricGroup, groupSnapshot) ⇒ - groupSnapshot.metrics collect { - case (name, snapshot) ⇒ Metric.fromKamonMetricSnapshot(snapshot, s"Custom/${mg.name}", None, Scale.Unit) - } - }.flatten.toMap + def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData] = { + metrics.get(UserMetrics.entity).map { allUserMetrics ⇒ + allUserMetrics.metrics.map { + case (key, snapshot) ⇒ Metric(snapshot, key.unitOfMeasurement, s"Custom/${key.name}", None) + } + + } getOrElse (Map.empty) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala index 0e53be0b..6e16b975 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala @@ -15,7 +15,7 @@ * ========================================================== */ package kamon.newrelic -import kamon.Timestamp +import kamon.util.Timestamp import spray.json._ object JsonProtocol extends DefaultJsonProtocol { diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala index 52d21f31..20204b79 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala @@ -1,8 +1,8 @@ package kamon.newrelic -import kamon.Timestamp -import kamon.metric.instrument.{ Counter, Histogram } -import kamon.metric.{ MetricSnapshot, Scale } +import kamon.metric.instrument._ +import kamon.metric.MetricKey +import kamon.util.{ MapMerge, Timestamp } case class MetricID(name: String, scope: Option[String]) case class MetricData(callCount: Long, total: Double, totalExclusive: Double, min: Double, max: Double, sumOfSquares: Double) { @@ -18,16 +18,23 @@ case class MetricData(callCount: Long, total: Double, totalExclusive: Double, mi object Metric { - def fromKamonMetricSnapshot(snapshot: MetricSnapshot, name: String, scope: Option[String], targetScale: Scale): Metric = { + def scaleFunction(uom: UnitOfMeasurement): Long ⇒ Double = uom match { + case time: Time ⇒ time.scale(Time.Seconds) + case other ⇒ _.toDouble + } + + def apply(snapshot: InstrumentSnapshot, snapshotUnit: UnitOfMeasurement, name: String, scope: Option[String]): Metric = { snapshot match { case hs: Histogram.Snapshot ⇒ var total: Double = 0D var sumOfSquares: Double = 0D - val scaledMin = Scale.convert(hs.scale, targetScale, hs.min) - val scaledMax = Scale.convert(hs.scale, targetScale, hs.max) + val scaler = scaleFunction(snapshotUnit) + + val scaledMin = scaler(hs.min) + val scaledMax = scaler(hs.max) hs.recordsIterator.foreach { record ⇒ - val scaledValue = Scale.convert(hs.scale, targetScale, record.level) + val scaledValue = scaler(record.level) total += scaledValue * record.count sumOfSquares += (scaledValue * scaledValue) * record.count @@ -42,12 +49,12 @@ object Metric { } case class TimeSliceMetrics(from: Timestamp, to: Timestamp, metrics: Map[MetricID, MetricData]) { - import kamon.metric.combineMaps + import MapMerge.Syntax def merge(that: TimeSliceMetrics): TimeSliceMetrics = { val mergedFrom = Timestamp.earlier(from, that.from) val mergedTo = Timestamp.later(to, that.to) - val mergedMetrics = combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r)) + val mergedMetrics = metrics.merge(that.metrics, (l, r) ⇒ l.merge(r)) TimeSliceMetrics(mergedFrom, mergedTo, mergedMetrics) } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala index 286b0a77..51c1ad21 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala @@ -6,9 +6,9 @@ import akka.actor.{ Props, ActorLogging, Actor } import akka.pattern.pipe import akka.io.IO import kamon.Kamon -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms } +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import kamon.metric._ +import kamon.metric.instrument.CollectionContext import kamon.newrelic.ApiMethodClient.{ AgentShutdownRequiredException, AgentRestartRequiredException } import kamon.newrelic.MetricReporter.{ PostFailed, PostSucceeded } import spray.can.Http @@ -22,7 +22,7 @@ class MetricReporter(settings: AgentSettings) extends Actor with ActorLogging wi val metricsExtension = Kamon(Metrics)(context.system) val collectionContext = metricsExtension.buildDefaultCollectionContext val metricsSubscriber = { - val tickInterval = context.system.settings.config.getDuration("kamon.metrics.tick-interval", TimeUnit.MILLISECONDS) + val tickInterval = context.system.settings.config.getDuration("kamon.metric.tick-interval", TimeUnit.MILLISECONDS) // Metrics are always sent to New Relic in 60 seconds intervals. if (tickInterval == 60000) self @@ -91,14 +91,8 @@ class MetricReporter(settings: AgentSettings) extends Actor with ActorLogging wi } def subscribeToMetrics(): Unit = { - // Subscribe to Trace Metrics - metricsExtension.subscribe(TraceMetrics, "*", metricsSubscriber, permanently = true) - - // Subscribe to all User Metrics - metricsExtension.subscribe(UserHistograms, "*", metricsSubscriber, permanently = true) - metricsExtension.subscribe(UserCounters, "*", metricsSubscriber, permanently = true) - metricsExtension.subscribe(UserMinMaxCounters, "*", metricsSubscriber, permanently = true) - metricsExtension.subscribe(UserGauges, "*", metricsSubscriber, permanently = true) + metricsExtension.subscribe("trace", "*", metricsSubscriber, permanently = true) + metricsExtension.subscribe("user-metrics", "*", metricsSubscriber, permanently = true) } } @@ -113,5 +107,5 @@ object MetricReporter { } trait MetricExtractor { - def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] + def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData] } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 56b29aff..7f56d931 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -21,8 +21,8 @@ import java.util import akka.actor.{ Actor, ActorLogging } import akka.event.Logging.{ Error, InitializeLogger, LoggerInitialized } import com.newrelic.api.agent.{ NewRelic ⇒ NR } -import kamon.trace.TraceLocal.{ HttpContext, HttpContextKey } -import kamon.trace.{ TraceLocal, TraceRecorder, TraceContextAware } +import kamon.trace.TraceLocal.HttpContextKey +import kamon.trace.{ TraceContext, TraceLocal, TraceContextAware } trait CustomParamsSupport { this: NewRelicErrorLogger ⇒ @@ -64,7 +64,7 @@ class NewRelicErrorLogger extends Actor with ActorLogging with CustomParamsSuppo //Really ugly, but temporal hack until next release... def runInFakeTransaction[T](thunk: ⇒ T): T = { val oldName = Thread.currentThread.getName - Thread.currentThread.setName(TraceRecorder.currentContext.name) + Thread.currentThread.setName(TraceContext.currentContext.name) try thunk finally Thread.currentThread.setName(oldName) } }
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala index baf20434..d0144f4b 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala @@ -16,78 +16,81 @@ package kamon.newrelic +import kamon.metric.{ EntitySnapshot, Entity } + import scala.collection.mutable -import kamon.metric._ -import kamon.metric.TraceMetrics.TraceMetricsSnapshot -import kamon.metric.instrument.Histogram -import kamon.trace.SegmentCategory.HttpClient -import kamon.trace.SegmentMetricIdentity +import kamon.metric.instrument.{ Time, CollectionContext, Histogram } object WebTransactionMetricExtractor extends MetricExtractor { - def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): Map[MetricID, MetricData] = { + def extract(settings: AgentSettings, collectionContext: CollectionContext, metrics: Map[Entity, EntitySnapshot]): Map[MetricID, MetricData] = { val apdexBuilder = new ApdexBuilder("Apdex", None, settings.apdexT) // Trace metrics are recorded in nanoseconds. - var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) - var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) + var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty + var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty val externalByHostSnapshots = mutable.Map.empty[String, List[Histogram.Snapshot]] val externalByHostAndLibrarySnapshots = mutable.Map.empty[(String, String), List[Histogram.Snapshot]] val externalScopedByHostAndLibrarySnapshots = mutable.Map.empty[(String, String, String), List[Histogram.Snapshot]] - val transactionMetrics = metrics.collect { - case (TraceMetrics(traceName), tms: TraceMetricsSnapshot) ⇒ - - tms.segments.foreach { - case (SegmentMetricIdentity(segmentName, category, library), snapshot: Histogram.Snapshot) if category.equals(HttpClient) ⇒ - accumulatedExternalServices = accumulatedExternalServices.merge(snapshot, collectionContext) + val transactionMetrics = metrics.filterKeys(_.category == "trace").map { + case (entity: Entity, es: EntitySnapshot) ⇒ + // Trace metrics only have elapsed-time and segments and all of them are Histograms. + es.histograms.foreach { + case (key, segmentSnapshot) if key.metadata.get("category").filter(_ == "http-client").nonEmpty ⇒ + val library = key.metadata("library") + accumulatedExternalServices = accumulatedExternalServices.merge(segmentSnapshot, collectionContext) // Accumulate externals by host - externalByHostSnapshots.update(segmentName, snapshot :: externalByHostSnapshots.getOrElse(segmentName, Nil)) + externalByHostSnapshots.update(key.name, segmentSnapshot :: externalByHostSnapshots.getOrElse(key.name, Nil)) // Accumulate externals by host and library - externalByHostAndLibrarySnapshots.update((segmentName, library), - snapshot :: externalByHostAndLibrarySnapshots.getOrElse((segmentName, library), Nil)) + externalByHostAndLibrarySnapshots.update((key.name, library), + segmentSnapshot :: externalByHostAndLibrarySnapshots.getOrElse((key.name, library), Nil)) // Accumulate externals by host and library, including the transaction as scope. - externalScopedByHostAndLibrarySnapshots.update((segmentName, library, traceName), - snapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((segmentName, library, traceName), Nil)) + externalScopedByHostAndLibrarySnapshots.update((key.name, library, entity.name), + segmentSnapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((key.name, library, entity.name), Nil)) - case otherSegments ⇒ // Ignore other kinds of segments. - } + case otherSegments ⇒ - accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(tms.elapsedTime, collectionContext) - tms.elapsedTime.recordsIterator.foreach { record ⇒ - apdexBuilder.record(Scale.convert(tms.elapsedTime.scale, Scale.Unit, record.level), record.count) } - Metric.fromKamonMetricSnapshot(tms.elapsedTime, "WebTransaction/Custom/" + traceName, None, Scale.Unit) - } + es.histograms.collect { + case (key, elapsedTime) if key.name == "elapsed-time" ⇒ + accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(elapsedTime, collectionContext) + elapsedTime.recordsIterator.foreach { record ⇒ + apdexBuilder.record(Time.Nanoseconds.scale(Time.Seconds)(record.level), record.count) + } + + Metric(elapsedTime, key.unitOfMeasurement, "WebTransaction/Custom/" + entity.name, None) + } + } flatten - val httpDispatcher = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "HttpDispatcher", None, Scale.Unit) + val httpDispatcher = Metric(accumulatedHttpDispatcher, Time.Seconds, "HttpDispatcher", None) val webTransaction = httpDispatcher.copy(MetricID("WebTransaction", None)) val webTransactionTotal = httpDispatcher.copy(MetricID("WebTransactionTotalTime", None)) - val externalAllWeb = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External/allWeb", None, Scale.Unit) + val externalAllWeb = Metric(accumulatedExternalServices, Time.Seconds, "External/allWeb", None) val externalAll = externalAllWeb.copy(MetricID("External/all", None)) val externalByHost = externalByHostSnapshots.map { case (host, snapshots) ⇒ - val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) - Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/all", None, Scale.Unit) + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext)) + Metric(mergedSnapshots, Time.Seconds, s"External/$host/all", None) } val externalByHostAndLibrary = externalByHostAndLibrarySnapshots.map { case ((host, library), snapshots) ⇒ - val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) - Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", None, Scale.Unit) + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext)) + Metric(mergedSnapshots, Time.Seconds, s"External/$host/$library", None) } val externalScopedByHostAndLibrary = externalScopedByHostAndLibrarySnapshots.map { case ((host, library, traceName), snapshots) ⇒ - val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) - Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName), Scale.Unit) + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty)(_.merge(_, collectionContext)) + Metric(mergedSnapshots, Time.Seconds, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName)) } Map(httpDispatcher, webTransaction, webTransactionTotal, externalAllWeb, externalAll, apdexBuilder.build) ++ |