From 5b9bbb196734c47e67d69d48e378e196b205fd57 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 10 Nov 2014 04:22:05 +0100 Subject: + newrelic: report additional and scoped external service metrics, improves #63. --- .../main/scala/kamon/newrelic/JsonProtocol.scala | 53 +++++++++++++-- .../main/scala/kamon/newrelic/MetricReporter.scala | 4 +- .../scala/kamon/newrelic/NewRelicJsonPrinter.scala | 59 +++++++++++++++++ .../newrelic/WebTransactionMetricExtractor.scala | 67 ++++++++++++++----- .../scala/kamon/newrelic/MetricReporterSpec.scala | 77 ++++++++++------------ .../main/scala/test/SimpleRequestProcessor.scala | 9 ++- 6 files changed, 198 insertions(+), 71 deletions(-) create mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala index c573d04d..26e8839e 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/JsonProtocol.scala @@ -32,18 +32,46 @@ object JsonProtocol extends DefaultJsonProtocol { "pid" -> JsNumber(obj.pid))) } - implicit def seqWriter[T: JsonWriter] = new JsonWriter[Seq[T]] { + implicit def seqWriter[T: JsonFormat] = new JsonFormat[Seq[T]] { + def read(value: JsValue): Seq[T] = value match { + case JsArray(elements) ⇒ elements.map(_.convertTo[T])(collection.breakOut) + case x ⇒ deserializationError("Expected Seq as JsArray, but got " + x) + } + def write(seq: Seq[T]) = JsArray(seq.map(_.toJson).toVector) } - implicit object MetricDetailWriter extends JsonWriter[Metric] { + implicit object MetricDetailWriter extends JsonFormat[Metric] { + def read(json: JsValue): (MetricID, MetricData) = json match { + case JsArray(elements) ⇒ + val metricID = elements(0) match { + case JsObject(fields) ⇒ MetricID(fields("name").convertTo[String], fields.get("scope").map(_.convertTo[String])) + case x ⇒ deserializationError("Expected MetricID as JsObject, but got " + x) + } + + val metricData = elements(1) match { + case JsArray(dataElements) ⇒ + MetricData( + dataElements(0).convertTo[Long], + dataElements(1).convertTo[Double], + dataElements(2).convertTo[Double], + dataElements(3).convertTo[Double], + dataElements(4).convertTo[Double], + dataElements(5).convertTo[Double]) + case x ⇒ deserializationError("Expected MetricData as JsArray, but got " + x) + } + + (metricID, metricData) + + case x ⇒ deserializationError("Expected Metric as JsArray, but got " + x) + } + def write(obj: Metric): JsValue = { val (metricID, metricData) = obj + val nameAndScope = metricID.scope.foldLeft(Map("name" -> JsString(metricID.name)))((m, scope) ⇒ m + ("scope" -> JsString(scope))) JsArray( - JsObject( - "name" -> JsString(metricID.name) // TODO Include scope - ), + JsObject(nameAndScope), JsArray( JsNumber(metricData.callCount), JsNumber(metricData.total), @@ -54,7 +82,20 @@ object JsonProtocol extends DefaultJsonProtocol { } } - implicit object MetricBatchWriter extends RootJsonWriter[MetricBatch] { + implicit object MetricBatchWriter extends RootJsonFormat[MetricBatch] { + + def read(json: JsValue): MetricBatch = json match { + case JsArray(elements) ⇒ + val runID = elements(0).convertTo[Long] + val timeSliceFrom = elements(1).convertTo[Long] + val timeSliceTo = elements(2).convertTo[Long] + val metrics = elements(3).convertTo[Seq[Metric]] + + MetricBatch(runID, TimeSliceMetrics(timeSliceFrom, timeSliceTo, metrics.toMap)) + + case x ⇒ deserializationError("Expected Array as JsArray, but got " + x) + } + def write(obj: MetricBatch): JsValue = JsArray( JsNumber(obj.runID), diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala index b09973ef..0aa078f5 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricReporter.scala @@ -14,8 +14,6 @@ import kamon.newrelic.MetricReporter.{ UnexpectedStatusCodeException, PostFailed import spray.can.Http import spray.http.Uri import spray.httpx.SprayJsonSupport -import spray.json.CompactPrinter - import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -76,7 +74,7 @@ class MetricReporter(settings: Agent.Settings, runID: Long, baseUri: Uri) extend log.debug("Sending [{}] metrics to New Relic for the time slice between {} and {}.", slice.metrics.size, slice.from, slice.to) collectorClient { - Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, CompactPrinter)) + Post(metricDataUri, MetricBatch(runID, slice))(sprayJsonMarshaller(MetricBatchWriter, NewRelicJsonPrinter)) } map { response ⇒ if (response.status.isSuccess) diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala new file mode 100644 index 00000000..713a5586 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicJsonPrinter.scala @@ -0,0 +1,59 @@ +package kamon.newrelic + +import java.lang.{ StringBuilder ⇒ JStringBuilder } +import spray.json._ +import scala.annotation.tailrec + +// We need a special treatment of / that needs to be escaped as \/ for New Relic to work properly with all metrics. +// Without this custom json printer the scoped metrics are not displayed in the site. + +// format: OFF +trait NewRelicJsonPrinter extends CompactPrinter { + + override def printString(s: String, sb: JStringBuilder) { + import NewRelicJsonPrinter._ + @tailrec def firstToBeEncoded(ix: Int = 0): Int = + if (ix == s.length) -1 else if (requiresEncoding(s.charAt(ix))) ix else firstToBeEncoded(ix + 1) + + sb.append('"') + firstToBeEncoded() match { + case -1 ⇒ sb.append(s) + case first ⇒ + sb.append(s, 0, first) + @tailrec def append(ix: Int): Unit = + if (ix < s.length) { + s.charAt(ix) match { + case c if !requiresEncoding(c) => sb.append(c) + case '"' => sb.append("\\\"") + case '\\' => sb.append("\\\\") + case '/' => sb.append("\\/") + case '\b' => sb.append("\\b") + case '\f' => sb.append("\\f") + case '\n' => sb.append("\\n") + case '\r' => sb.append("\\r") + case '\t' => sb.append("\\t") + case x if x <= 0xF => sb.append("\\u000").append(Integer.toHexString(x)) + case x if x <= 0xFF => sb.append("\\u00").append(Integer.toHexString(x)) + case x if x <= 0xFFF => sb.append("\\u0").append(Integer.toHexString(x)) + case x => sb.append("\\u").append(Integer.toHexString(x)) + } + append(ix + 1) + } + append(first) + } + sb.append('"') + } +} + +object NewRelicJsonPrinter extends NewRelicJsonPrinter { + + def requiresEncoding(c: Char): Boolean = + // from RFC 4627 + // unescaped = %x20-21 / %x23-5B / %x5D-10FFFF + c match { + case '"' ⇒ true + case '\\' ⇒ true + case '/' ⇒ true + case c ⇒ c < 0x20 + } +} \ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala index 15eba982..0a4a516b 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetricExtractor.scala @@ -16,8 +16,9 @@ package kamon.newrelic +import scala.collection.mutable; import kamon.metric._ -import kamon.metric.TraceMetrics.ElapsedTime +import kamon.metric.TraceMetrics.{ TraceMetricsSnapshot, ElapsedTime } import kamon.metric.instrument.Histogram import kamon.trace.SegmentCategory.HttpClient import kamon.trace.SegmentMetricIdentity @@ -31,33 +32,65 @@ object WebTransactionMetricExtractor extends MetricExtractor { var accumulatedHttpDispatcher: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) var accumulatedExternalServices: Histogram.Snapshot = Histogram.Snapshot.empty(Scale.Nano) - val transactionMetrics = metrics.collect { - case (TraceMetrics(name), groupSnapshot) ⇒ - - groupSnapshot.metrics collect { - // Extract WebTransaction metrics and accumulate HttpDispatcher - case (ElapsedTime, snapshot: Histogram.Snapshot) ⇒ - accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(snapshot, collectionContext) - snapshot.recordsIterator.foreach { record ⇒ - apdexBuilder.record(Scale.convert(snapshot.scale, Scale.Unit, record.level), record.count) - } + val externalByHostSnapshots = mutable.Map.empty[String, List[Histogram.Snapshot]] + val externalByHostAndLibrarySnapshots = mutable.Map.empty[(String, String), List[Histogram.Snapshot]] + val externalScopedByHostAndLibrarySnapshots = mutable.Map.empty[(String, String, String), List[Histogram.Snapshot]] - Metric.fromKamonMetricSnapshot(snapshot, s"WebTransaction/Custom/$name", None, Scale.Unit) + val transactionMetrics = metrics.collect { + case (TraceMetrics(traceName), tms: TraceMetricsSnapshot) ⇒ - // Extract all external services. + tms.segments.foreach { case (SegmentMetricIdentity(segmentName, category, library), snapshot: Histogram.Snapshot) if category.equals(HttpClient) ⇒ accumulatedExternalServices = accumulatedExternalServices.merge(snapshot, collectionContext) - Metric.fromKamonMetricSnapshot(snapshot, s"External/$segmentName/all", None, Scale.Unit) + // Accumulate externals by host + externalByHostSnapshots.update(segmentName, snapshot :: externalByHostSnapshots.getOrElse(segmentName, Nil)) + + // Accumulate externals by host and library + externalByHostAndLibrarySnapshots.update((segmentName, library), + snapshot :: externalByHostAndLibrarySnapshots.getOrElse((segmentName, library), Nil)) + + // Accumulate externals by host and library, including the transaction as scope. + externalScopedByHostAndLibrarySnapshots.update((segmentName, library, traceName), + snapshot :: externalScopedByHostAndLibrarySnapshots.getOrElse((segmentName, library, traceName), Nil)) + + } + + accumulatedHttpDispatcher = accumulatedHttpDispatcher.merge(tms.elapsedTime, collectionContext) + tms.elapsedTime.recordsIterator.foreach { record ⇒ + apdexBuilder.record(Scale.convert(tms.elapsedTime.scale, Scale.Unit, record.level), record.count) } + + Metric.fromKamonMetricSnapshot(tms.elapsedTime, "WebTransaction/Custom/" + traceName, None, Scale.Unit) } val httpDispatcher = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "HttpDispatcher", None, Scale.Unit) - val webTransaction = Metric.fromKamonMetricSnapshot(accumulatedHttpDispatcher, "WebTransaction", None, Scale.Unit) - val external = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External", None, Scale.Unit) + val webTransaction = httpDispatcher.copy(MetricID("WebTransaction", None)) + val webTransactionTotal = httpDispatcher.copy(MetricID("WebTransactionTotalTime", None)) + val externalAllWeb = Metric.fromKamonMetricSnapshot(accumulatedExternalServices, "External/allWeb", None, Scale.Unit) + val externalAll = externalAllWeb.copy(MetricID("External/all", None)) + + val externalByHost = externalByHostSnapshots.map { + case (host, snapshots) ⇒ + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) + Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/all", None, Scale.Unit) + } + + val externalByHostAndLibrary = externalByHostAndLibrarySnapshots.map { + case ((host, library), snapshots) ⇒ + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) + Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", None, Scale.Unit) + } + + val externalScopedByHostAndLibrary = externalScopedByHostAndLibrarySnapshots.map { + case ((host, library, traceName), snapshots) ⇒ + val mergedSnapshots = snapshots.foldLeft(Histogram.Snapshot.empty(Scale.Nano))(_.merge(_, collectionContext)) + Metric.fromKamonMetricSnapshot(mergedSnapshots, s"External/$host/$library", Some("WebTransaction/Custom/" + traceName), Scale.Unit) + } - Map(httpDispatcher, webTransaction, external, externalAllWeb, apdexBuilder.build) ++ transactionMetrics.flatten.toMap + Map(httpDispatcher, webTransaction, webTransactionTotal, externalAllWeb, externalAll, apdexBuilder.build) ++ + transactionMetrics ++ externalByHost ++ externalByHostAndLibrary ++ externalScopedByHostAndLibrary } } diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala index 3cf4bbd0..0001072e 100644 --- a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala @@ -23,15 +23,16 @@ import com.typesafe.config.ConfigFactory import kamon.metric.{ TraceMetrics, Metrics } import kamon.{ Kamon, AkkaExtensionSwap } import kamon.metric.Subscriptions.TickMetricSnapshot -import org.scalatest.WordSpecLike +import org.scalatest.{ Matchers, WordSpecLike } import spray.can.Http import spray.http.Uri.Query import spray.http._ import spray.httpx.encoding.Deflate import spray.httpx.{ RequestBuilding, SprayJsonSupport } import scala.concurrent.duration._ +import spray.json._ -class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuilding with SprayJsonSupport { +class MetricReporterSpec extends TestKitBase with WordSpecLike with Matchers with RequestBuilding with SprayJsonSupport { import kamon.newrelic.JsonProtocol._ implicit lazy val system: ActorSystem = ActorSystem("metric-reporter-spec", ConfigFactory.parseString( @@ -61,21 +62,21 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuild val metricReporter = system.actorOf(MetricReporter.props(agentSettings, 9999, baseCollectorUri)) metricReporter ! firstSnapshot - httpManager.expectMsg(Deflate.encode { - HttpRequest(method = HttpMethods.POST, uri = rawMethodUri("collector-1.newrelic.com", "metric_data"), entity = compactJsonEntity( - s""" - |[9999,0,0, - |[ - | [{"name":"Apdex"},[3,0.0,0.0,1.0,1.0,0.0]], - | [{"name":"WebTransaction"},[3,0.005996544,0.005996544,0.000999424,0.002998272,0.000013983876644864]], - | [{"name":"External"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]], - | [{"name":"WebTransaction/Custom/example-trace"},[3,0.005996544,0.005996544,0.000999424,0.002998272,0.000013983876644864]], - | [{"name":"HttpDispatcher"},[3,0.005996544,0.005996544,0.000999424,0.002998272,0.000013983876644864]], - | [{"name":"External/allWeb"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]] - |] - |] - """.stripMargin)) - }) + val metricPost = httpManager.expectMsgType[HttpRequest] + + metricPost.method should be(HttpMethods.POST) + metricPost.uri should be(rawMethodUri("collector-1.newrelic.com", "metric_data")) + metricPost.encoding should be(HttpEncodings.deflate) + + val postedBatch = Deflate.decode(metricPost).entity.asString.parseJson.convertTo[MetricBatch] + postedBatch.runID should be(9999) + postedBatch.timeSliceMetrics.from should be(1415587618) + postedBatch.timeSliceMetrics.to should be(1415587678) + + val metrics = postedBatch.timeSliceMetrics.metrics + metrics(MetricID("Apdex", None)).callCount should be(3) + metrics(MetricID("WebTransaction", None)).callCount should be(3) + metrics(MetricID("HttpDispatcher", None)).callCount should be(3) } "accumulate metrics if posting fails" in new FakeTickSnapshotsFixture { @@ -87,31 +88,23 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuild httpManager.reply(Timedout(request)) metricReporter ! secondSnapshot - httpManager.expectMsg(Deflate.encode { - HttpRequest(method = HttpMethods.POST, uri = rawMethodUri("collector-1.newrelic.com", "metric_data"), entity = compactJsonEntity( - s""" - |[9999,0,0, - |[ - | [{"name":"Apdex"},[6,0.0,0.0,1.0,1.0,0.0]], - | [{"name":"WebTransaction"},[6,0.02097152,0.02097152,0.000999424,0.005996544,0.000090731720998912]], - | [{"name": "External"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]], - | [{"name":"WebTransaction/Custom/example-trace"},[6,0.02097152,0.02097152,0.000999424,0.005996544,0.000090731720998912]], - | [{"name":"HttpDispatcher"},[6,0.02097152,0.02097152,0.000999424,0.005996544,0.000090731720998912]], - | [{"name": "External/allWeb"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]] - |] - |] - """.stripMargin)) - }) + val metricPost = httpManager.expectMsgType[HttpRequest] + + metricPost.method should be(HttpMethods.POST) + metricPost.uri should be(rawMethodUri("collector-1.newrelic.com", "metric_data")) + metricPost.encoding should be(HttpEncodings.deflate) + + val postedBatch = Deflate.decode(metricPost).entity.asString.parseJson.convertTo[MetricBatch] + postedBatch.runID should be(9999) + postedBatch.timeSliceMetrics.from should be(1415587618) + postedBatch.timeSliceMetrics.to should be(1415587738) + + val metrics = postedBatch.timeSliceMetrics.metrics + metrics(MetricID("Apdex", None)).callCount should be(6) + metrics(MetricID("WebTransaction", None)).callCount should be(6) + metrics(MetricID("HttpDispatcher", None)).callCount should be(6) } } - /* - [9999, 0, 0, [ - [{"name": "Apdex"}, [6, 0.0, 0.0, 1.0, 1.0, 0.0]], - [{"name": "WebTransaction"}, [6, 0.02097152, 0.02097152, 0.000999424, 0.005996544, 0.000090731720998912]], - [{"name": "External"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]], - [{"name": "WebTransaction/Custom/example-trace"}, [6, 0.02097152, 0.02097152, 0.000999424, 0.005996544, 0.000090731720998912]], - [{"name": "HttpDispatcher"}, [6, 0.02097152, 0.02097152, 0.000999424, 0.005996544, 0.000090731720998912]], - [{"name": "External/allWeb"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]]]]*/ def setHttpManager(probe: TestProbe): TestProbe = { AkkaExtensionSwap.swap(system, Http, new IO.Extension { @@ -146,11 +139,11 @@ class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuild recorder.elapsedTime.record(1000000) recorder.elapsedTime.record(2000000) recorder.elapsedTime.record(3000000) - val firstSnapshot = TickMetricSnapshot(1, 100, Map(testTraceID -> collectRecorder)) + val firstSnapshot = TickMetricSnapshot(1415587618000L, 1415587678000L, Map(testTraceID -> collectRecorder)) recorder.elapsedTime.record(6000000) recorder.elapsedTime.record(5000000) recorder.elapsedTime.record(4000000) - val secondSnapshot = TickMetricSnapshot(100, 200, Map(testTraceID -> collectRecorder)) + val secondSnapshot = TickMetricSnapshot(1415587678000L, 1415587738000L, Map(testTraceID -> collectRecorder)) } } \ No newline at end of file diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index da6a3cbe..7d7494eb 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -83,8 +83,11 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } } ~ path("site") { - complete { - pipeline(Get("http://localhost:9090/site-redirect")) + traceName("FinalGetSite-3") { + complete { + for (f1 <- pipeline(Get("http://127.0.0.1:9090/ok")); + f2 <- pipeline(Get("http://www.google.com/search?q=mkyong"))) yield "Ok Double Future" + } } } ~ path("site-redirect") { @@ -99,7 +102,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } } ~ path("ok") { - traceName("OK") { + traceName("RespondWithOK-3") { complete { "ok" } -- cgit v1.2.3