From 5b9bbb196734c47e67d69d48e378e196b205fd57 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 10 Nov 2014 04:22:05 +0100 Subject: + newrelic: report additional and scoped external service metrics, improves #63. --- .../main/scala/kamon/newrelic/JsonProtocol.scala | 53 +++++++++++++++-- .../main/scala/kamon/newrelic/MetricReporter.scala | 4 +- .../scala/kamon/newrelic/NewRelicJsonPrinter.scala | 59 +++++++++++++++++++ .../newrelic/WebTransactionMetricExtractor.scala | 67 ++++++++++++++++------ 4 files changed, 157 insertions(+), 26 deletions(-) create mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala (limited to 'kamon-newrelic/src/main/scala/kamon/newrelic') diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala index c573d04d..26e8839e 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala @@ -32,18 +32,46 @@ object JsonProtocol extends DefaultJsonProtocol { "pid" -> JsNumber(obj.pid))) } - implicit def seqWriter[T: JsonWriter] = new JsonWriter[Seq[T]] { + implicit def seqWriter[T: JsonFormat] = new JsonFormat[Seq[T]] { + def read(value: JsValue): Seq[T] = value match { + case JsArray(elements) ⇒ elements.map(_.convertTo[T])(collection.breakOut) + case x ⇒ deserializationError("Expected Seq as JsArray, but got " + x) + } + def write(seq: Seq[T]) = JsArray(seq.map(_.toJson).toVector) } - implicit object MetricDetailWriter extends JsonWriter[Metric] { + implicit object MetricDetailWriter extends JsonFormat[Metric] { + def read(json: JsValue): (MetricID, MetricData) = json match { + case JsArray(elements) ⇒ + val metricID = elements(0) match { + case JsObject(fields) ⇒ MetricID(fields("name").convertTo[String], fields.get("scope").map(_.convertTo[String])) + case x ⇒ deserializationError("Expected MetricID as JsObject, but got " + x) + } + + val metricData = elements(1) match { + case JsArray(dataElements) ⇒ + MetricData( + dataElements(0).convertTo[Long], + dataElements(1).convertTo[Double], + dataElements(2).convertTo[Double], + dataElements(3).convertTo[Double], + dataElements(4).convertTo[Double], + dataElements(5).convertTo[Double]) + case x ⇒ deserializationError("Expected MetricData as JsArray, but got " + x) + } + + (metricID, metricData) + + case x ⇒ deserializationError("Expected Metric as JsArray, but got " + x) + } + def write(obj: Metric): JsValue = { val (metricID, metricData) = obj + val nameAndScope = metricID.scope.foldLeft(Map("name" -> JsString(metricID.name)))((m, scope) ⇒ m + ("scope" -> JsString(scope))) JsArray( - JsObject( - "name" -> JsString(metricID.name) // TODO Include scope - ), + JsObject(nameAndScope), JsArray( JsNumber(metricData.callCount), JsNumber(metricData.total), @@ -54,7 +82,20 @@ object JsonProtocol extends DefaultJsonProtocol { } } - implicit object MetricBatchWriter extends RootJsonWriter[MetricBatch] { + implicit object MetricBatchWriter extends RootJsonFormat[MetricBatch] { + + def read(json: JsValue): MetricBatch = json match { + case JsArray(elements) ⇒ + val runID = elements(0).convertTo[Long] + val timeSliceFrom = elements(1).convertTo[Long] + val timeSliceTo = elements(2).convertTo[Long] + val metrics = elements(3).convertTo[Seq[Metric]] + + MetricBatch(runID, TimeSliceMetrics(timeSliceFrom, timeSliceTo, metrics.toMap)) + + case x ⇒ deserializationError("Expected Array as JsArray, but got " + x) + } + def write(obj: MetricBatch): JsValue = JsArray( JsNumber(obj.runID), diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala index b09973ef..0aa078f5 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala @@ -14,8 +14,6 @@ import kamon.newrelic.MetricReporter.{ UnexpectedStatusCodeException, PostFailed import spray.can.Http import spray.http.Uri import spray.httpx.SprayJsonSupport -import spray.json.CompactPrinter - import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -76,7 +74,7 @@ class MetricReporter(settings: Agent.Settings, runID: Long, baseUri: Uri) extend log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", slice.metrics.size, slice.from, slice.to) collectorClient { - Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, CompactPrinter)) + Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter)) } map { response ⇒ if (response.status.isSuccess) diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala new file mode 100644 index 00000000..713a5586 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala @@ -0,0 +1,59 @@ +package kamon.newrelic + +import java.lang.{ StringBuilder ⇒ JStringBuilder } +import spray.json._ +import scala.annotation.tailrec + +// We need a special treatment of / that needs to be escaped as \/ for New Relic to work properly with all metrics. +// Without this custom json printer the scoped metrics are not displayed in the site. + +// format: OFF +trait NewRelicJsonPrinter extends CompactPrinter { + + override def printString(s: String, sb: JStringBuilder) { + import NewRelicJsonPrinter._ + @tailrec def firstToBeEncoded(ix: Int = 0): Int = + if (ix == s.length) -1 else if (requiresEncoding(s.charAt(ix))) ix else firstToBeEncoded(ix + 1) + + sb.append('"') + firstToBeEncoded() match { + case -1 ⇒ sb.append(s) + case first ⇒ + sb.append(s, 0, first) + @tailrec def append(ix: Int): Unit = + if (ix < s.length) { + s.charAt(ix) match { + case c if !requiresEncoding(c) => sb.append(c) + case '"' => sb.append("\\\"") + case '\\' => sb.append("\\\\") + case '/' => sb.append("\\/") + case '\b' => sb.append("\\b") + case '\f' => sb.append("\\f") + case '\n' => sb.append("\\n") + case '\r' => sb.append("\\r") + case '\t' => sb.append("\\t") + case x if x <= 0xF => sb.append("\\u000").append(Integer.toHexString(x)) + case x if x <= 0xFF => sb.append("\\u00").append(Integer.toHexString(x)) + case x if x <= 0xFFF => sb.append("\\u0").append(Integer.toHexString(x)) + case x => sb.append("\\u").append(Integer.toHexString(x)) + } + append(ix + 1) + } + append(first) + } + sb.append('"') + } +} + +object NewRelicJsonPrinter extends NewRelicJsonPrinter { + + def requiresEncoding(c: Char): Boolean = + // from RFC 4627 + // unescaped = %x20-21 / %x23-5B / %x5D-10FFFF + c match { + case '"' ⇒ true + case '\\' ⇒ true + case '/' ⇒ true + case c ⇒ c < 0x20 + } +} \ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala index 15eba982..0a4a516b 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala @@ -16,8 +16,9 @@ package kamon.newrelic +import scala.collection.mutable; import kamon.metric._ -import kamon.metric.TraceMetrics.ElapsedTime +import kamon.metric.TraceMetrics.{ TraceMetricsSnapshot, ElapsedTime } import kamon.metric.instrument.Histogram import kamon.trace.SegmentCategory.HttpClient import kamon.trace.SegmentMetricIdentity @@ -31,33 +32,65 @@ object WebTransactionMetricExtractor extends MetricExtractor { var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) - val transactionMetrics = metrics.collect { - case (TraceMetrics(name), groupSnapshot) ⇒ - - groupSnapshot.metrics collect { - // Extract WebTransaction metrics and accumulate HttpDispatcher - case (ElapsedTime, snapshot: Histogram.Snapshot) ⇒ - accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(snapshot, collectionContext) - snapshot.recordsIterator.foreach { record ⇒ - apdexBuilder.record(Scale.convert(snapshot.scale, Scale.Unit, record.level), record.count) - } + val externalByHostSnapshots = mutable.Map.empty[String, List[Histogram.Snapshot]] + val externalByHostAndLibrarySnapshots = mutable.Map.empty[(String, String), List[Histogram.Snapshot]] + val externalScopedByHostAndLibrarySnapshots = mutable.Map.empty[(String, String, String), List[Histogram.Snapshot]] - Metric.fromKamonMetricSnapshot(snapshot, s"WebTransaction/Custom/$name", None, Scale.Unit) + val transactionMetrics = metrics.collect { + case (TraceMetrics(traceName), tms: TraceMetricsSnapshot) ⇒ - // Extract all external services. + tms.segments.foreach { case (SegmentMetricIdentity(segmentName, category, library), snapshot: Histogram.Snapshot) if category.equals(HttpClient) ⇒ accumulatedExternalServices = accumulatedExternalServices.merge(snapshot, collectionContext) - Metric.fromKamonMetricSnapshot(snapshot, s"External/$segmentName/all", None, Scale.Unit) + // Accumulate externals by host + externalByHostSnapshots.update(segmentName, snapshot :: externalByHostSnapshots.getOrElse(segmentName, Nil)) + + // Accumulate externals by host and library + externalByHostAndLibrarySnapshots.update((segmentName, library), + snapshot :: externalByHostAndLibrarySnapshots.getOrElse((segmentName, library), Nil)) + + // Accumulate externals by host and library, including the transaction as scope. + externalScopedByHostAndLibrarySnapshots.update((segmentName, library, traceName), + snapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((segmentName, library, traceName), Nil)) + + } + + accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(tms.elapsedTime, collectionContext) + tms.elapsedTime.recordsIterator.foreach { record ⇒ + apdexBuilder.record(Scale.convert(tms.elapsedTime.scale, Scale.Unit, record.level), record.count) } + + Metric.fromKamonMetricSnapshot(tms.elapsedTime, "WebTransaction/Custom/" + traceName, None, Scale.Unit) } val httpDispatcher = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "HttpDispatcher", None, Scale.Unit) - val webTransaction = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "WebTransaction", None, Scale.Unit) - val external = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External", None, Scale.Unit) + val webTransaction = httpDispatcher.copy(MetricID("WebTransaction", None)) + val webTransactionTotal = httpDispatcher.copy(MetricID("WebTransactionTotalTime", None)) + val externalAllWeb = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External/allWeb", None, Scale.Unit) + val externalAll = externalAllWeb.copy(MetricID("External/all", None)) + + val externalByHost = externalByHostSnapshots.map { + case (host, snapshots) ⇒ + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) + Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/all", None, Scale.Unit) + } + + val externalByHostAndLibrary = externalByHostAndLibrarySnapshots.map { + case ((host, library), snapshots) ⇒ + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) + Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", None, Scale.Unit) + } + + val externalScopedByHostAndLibrary = externalScopedByHostAndLibrarySnapshots.map { + case ((host, library, traceName), snapshots) ⇒ + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) + Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName), Scale.Unit) + } - Map(httpDispatcher, webTransaction, external, externalAllWeb, apdexBuilder.build) ++ transactionMetrics.flatten.toMap + Map(httpDispatcher, webTransaction, webTransactionTotal, externalAllWeb, externalAll, apdexBuilder.build) ++ + transactionMetrics ++ externalByHost ++ externalByHostAndLibrary ++ externalScopedByHostAndLibrary } } -- cgit v1.2.3